Re: Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章 刘大龙
这个是是非窗口下agg的优化,目前窗口聚合agg还不支持mini batch


> -原始邮件-
> 发件人: "Tianwang Li" 
> 发送时间: 2020-09-30 14:32:39 (星期三)
> 收件人: user-zh@flink.apache.org
> 抄送: 
> 主题: Re: group agg 开启了mini batch之后,state ttl不生效的问题
> 
> 这种有窗口统计没有影响吧?
> 
> 
> 刘建刚  于2020年9月30日周三 下午2:25写道:
> 
> > 修复方案参考https://github.com/apache/flink/pull/11830
> >
> > kandy.wang  于2020年9月30日周三 下午2:19写道:
> >
> > > group agg 开启了mini batch之后,state ttl不生效的问题:
> > >
> > >
> > > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> > > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到
> > 十几万。
> > >
> > >
> > > sql-client-defaults.yaml对应的参数应该是这2个吧:
> > > # minimum idle state retention in ms
> > > min-idle-state-retention: 0
> > > # maximum idle state retention in ms
> > > max-idle-state-retention: 0
> > > 这个现在进展如何了,这个社区打算什么时候支持
> > >
> > >
> > >
> > >
> >
> 
> 
> -- 
> **
>  tivanli
> **


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


Re:flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 文章 chenxuying
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), 
[DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 
好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying"  写道:
>版本:
>pyflink==1.0
>apache-flink==1.11.2
>代码如下:
>env = StreamExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
>class SplitStr(TableFunction):
>def eval(self, data):
>for row in data:
>yield row[0], row[1]
>splitStr = udtf(
>SplitStr(),
>DataTypes.ARRAY(
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>),
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>)
>t_env.register_function("splitStr", splitStr)
>
>
>t_env.sql_update("""
>CREATE TABLE mySource (
>
>id varchar,
>data array> 
>) WITH ( 
>'connector' = 'kafka',
>'topic' = 'mytesttopic',
>'properties.bootstrap.servers' = '172.17.0.2:9092',
>'properties.group.id' = 'flink-test-cxy',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json' 
>) 
>""")
>t_env.sql_update("""
>CREATE TABLE mysqlsink (
>id varchar
>,name varchar
>,age  varchar
>) 
>with (
>'connector' = 'print'
>)
>""")
>t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
>,LATERAL TABLE(splitStr(data)) as T(name, age)")
>t_env.execute("test")
>
>
>最终报错
>TypeError: Invalid result_type: result_type should be DataType but contains 
>RowField(name, VARCHAR)
>报错的地方是
>File 
>"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
>
>
>def __init__(self, func, input_types, result_types, deterministic=None, 
>name=None):
>super(UserDefinedTableFunctionWrapper, self).__init__(
>func, input_types, deterministic, name)
>
>
>if not isinstance(result_types, collections.Iterable):
>result_types = [result_types]
>
>
>for result_type in result_types:
>if not isinstance(result_type, DataType):
>raise TypeError(
>"Invalid result_type: result_type should be DataType but contains {}".format(
>result_type))
>
>
>self._result_types = result_types
>self._judtf_placeholder = None
>
>
>断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
>
>
>另外的,假如我在
>上面在创建udtf的时候,如果这样写
>splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
>DataTypes.BIGINT()])
>却可以正常运行,但是显然类型跟我实际运行的不对应


flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 文章 chenxuying
版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


class SplitStr(TableFunction):
def eval(self, data):
for row in data:
yield row[0], row[1]
splitStr = udtf(
SplitStr(),
DataTypes.ARRAY(
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
),
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
)
t_env.register_function("splitStr", splitStr)


t_env.sql_update("""
CREATE TABLE mySource ( 
   
id varchar,
data array> 
) WITH ( 
'connector' = 'kafka',
'topic' = 'mytesttopic',
'properties.bootstrap.servers' = '172.17.0.2:9092',
'properties.group.id' = 'flink-test-cxy',
'scan.startup.mode' = 'latest-offset',
'format' = 'json' 
) 
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age  varchar
) 
with (
'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")


最终报错
TypeError: Invalid result_type: result_type should be DataType but contains 
RowField(name, VARCHAR)
报错的地方是
File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
 line 264, in __init__


def __init__(self, func, input_types, result_types, deterministic=None, 
name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)


if not isinstance(result_types, collections.Iterable):
result_types = [result_types]


for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))


self._result_types = result_types
self._judtf_placeholder = None


断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗


另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
感谢回复,这个任务重启了之后看不到这个in/out指标数据, 我们能查到这个任务依赖的redis的连接查询次数也降低了,好像是任务假死一样
一直在消费数据但是就是不处理数据 没有和redis进行交互

tison  于2020年9月30日周三 下午5:34写道:

> 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
>
> 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> 有问题,比如可能依赖了外部环境或者内部积累错误等等。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 下午5:26写道:
>
> > 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
> >
> > tison  于2020年9月30日周三 下午2:33写道:
> >
> > > Hi Yang,
> > >
> > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> > >
> > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Yang Peng  于2020年9月30日周三 上午10:29写道:
> > >
> > > > 感谢回复,我们看了consumer的lag很小
> > > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > > > 而且任务重启了没法jstack判断了
> > > >
> > > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> > > >
> > > > >
> > > > >
> > > > >
> > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > > > Best,
> > > > > Hailong Wang
> > > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > > > >
> > > > >
> > > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > > > >flinkkafkaconsumer消费的并行度也是90
> > 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > > > >
> > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > > > >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Hi Yang Peng:
> > > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > > > >> Best,
> > > > > >> Hailong Wang
> > > > > >>
> > > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 
> 写道:
> > > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > > > >> >kafka消费没有积压,也没有反压,
> > > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > > > >>
> > > > >
> > > >
> > >
> >
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
感谢回复,是的 就是从输出降低的时间点开始重新消费,是输出变少了 有些没有输出 运行任务的机器的网卡流量也变小了,监控系统是没有问题
其他任务的监控都正常

tison  于2020年9月30日周三 下午5:37写道:

>
> 故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。
>
> Best,
> tison.
>
>
> tison  于2020年9月30日周三 下午5:33写道:
>
> > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
> >
> > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> > 有问题,比如可能依赖了外部环境或者内部积累错误等等。
> >
> > Best,
> > tison.
> >
> >
> > Yang Peng  于2020年9月30日周三 下午5:26写道:
> >
> >> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> >> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
> >>
> >> tison  于2020年9月30日周三 下午2:33写道:
> >>
> >> > Hi Yang,
> >> >
> >> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> >> >
> >> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> >> >
> >> > Best,
> >> > tison.
> >> >
> >> >
> >> > Yang Peng  于2020年9月30日周三 上午10:29写道:
> >> >
> >> > > 感谢回复,我们看了consumer的lag很小
> >> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> >> > > 而且任务重启了没法jstack判断了
> >> > >
> >> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> >> > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> >> > > > 也可以 jstack 采下堆栈看下,GC等看下。
> >> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> >> > > > Best,
> >> > > > Hailong Wang
> >> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> >> > > > >flinkkafkaconsumer消费的并行度也是90
> >> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> >> > > > >
> >> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> >> > > > >
> >> > > > >>
> >> > > > >>
> >> > > > >>
> >> > > > >> Hi Yang Peng:
> >> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> >> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> >> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> >> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> >> > > > >> Best,
> >> > > > >> Hailong Wang
> >> > > > >>
> >> > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 
> 写道:
> >> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1
> 线上kafka集群为1.1.1
> >> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> >> > > > >> >kafka消费没有积压,也没有反压,
> >> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> >> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> >
>


有docker jobmanager HA的 1.11.2 docker-entrypoint.sh

2020-09-30 文章 marble.zh...@coinflex.com.INVALID
大家好, 有有docker jobmanager HA的 1.11.2 docker-entrypoint.sh脚本吗? 在官方的github里没有看到。
另外,有没有docker-compose.yml的配HA的例子吗? 谢谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。

Best,
tison.


tison  于2020年9月30日周三 下午5:33写道:

> 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
>
> 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> 有问题,比如可能依赖了外部环境或者内部积累错误等等。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 下午5:26写道:
>
>> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
>> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>>
>> tison  于2020年9月30日周三 下午2:33写道:
>>
>> > Hi Yang,
>> >
>> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
>> >
>> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > Yang Peng  于2020年9月30日周三 上午10:29写道:
>> >
>> > > 感谢回复,我们看了consumer的lag很小
>> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
>> > > 而且任务重启了没法jstack判断了
>> > >
>> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>> > >
>> > > >
>> > > >
>> > > >
>> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
>> > > > 也可以 jstack 采下堆栈看下,GC等看下。
>> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
>> > > > Best,
>> > > > Hailong Wang
>> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
>> > > >
>> > > >
>> > >
>> >
>> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
>> > > > >flinkkafkaconsumer消费的并行度也是90
>> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
>> > > > >
>> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>> > > > >
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> Hi Yang Peng:
>> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
>> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
>> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
>> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
>> > > > >> Best,
>> > > > >> Hailong Wang
>> > > > >>
>> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
>> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>> > > > >> >kafka消费没有积压,也没有反压,
>> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>> > > > >>
>> > > >
>> > >
>> >
>>
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...

照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
有问题,比如可能依赖了外部环境或者内部积累错误等等。

Best,
tison.


Yang Peng  于2020年9月30日周三 下午5:26写道:

> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>
> tison  于2020年9月30日周三 下午2:33写道:
>
> > Hi Yang,
> >
> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> >
> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> >
> > Best,
> > tison.
> >
> >
> > Yang Peng  于2020年9月30日周三 上午10:29写道:
> >
> > > 感谢回复,我们看了consumer的lag很小
> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > > 而且任务重启了没法jstack判断了
> > >
> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> > >
> > > >
> > > >
> > > >
> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > > Best,
> > > > Hailong Wang
> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > > >
> > > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > > >flinkkafkaconsumer消费的并行度也是90
> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > > >
> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > > >
> > > > >>
> > > > >>
> > > > >>
> > > > >> Hi Yang Peng:
> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > > >> Best,
> > > > >> Hailong Wang
> > > > >>
> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > > >> >kafka消费没有积压,也没有反压,
> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > > >>
> > > >
> > >
> >
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
感谢回复,是的,之前确实怀疑是业务逻辑导致的
但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题

tison  于2020年9月30日周三 下午2:33写道:

> Hi Yang,
>
> 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
>
> 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 上午10:29写道:
>
> > 感谢回复,我们看了consumer的lag很小
> > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > 而且任务重启了没法jstack判断了
> >
> > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> >
> > >
> > >
> > >
> > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > Best,
> > > Hailong Wang
> > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > >
> > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > >
> > > >>
> > > >>
> > > >>
> > > >> Hi Yang Peng:
> > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > >> Best,
> > > >> Hailong Wang
> > > >>
> > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > >> >kafka消费没有积压,也没有反压,
> > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > >>
> > >
> >
>


回复:flink任务yarn perjob 提交任务如何设置job name

2020-09-30 文章 宁吉浩
-m yarn-cluster -ynm name
这个可以设置 yarn-application 的名字


--
发件人:丁浩浩 <18579099...@163.com>
发送时间:2020年9月30日(星期三) 15:44
收件人:user-zh@flink.apache.org 
主 题:flink任务yarn perjob 提交任务如何设置job name

如题,我需要设置flink提交到yarn的job name应该怎么设置呢?

Re: flink任务yarn perjob 提交任务如何设置job name

2020-09-30 文章 tison
代码里 env.execute("你的作业名")

Best,
tison.


丁浩浩 <18579099...@163.com> 于2020年9月30日周三 下午3:44写道:

> 如题,我需要设置flink提交到yarn的job name应该怎么设置呢?


flink任务yarn perjob 提交任务如何设置job name

2020-09-30 文章 丁浩浩
如题,我需要设置flink提交到yarn的job name应该怎么设置呢?

回复:Flink on K8s statebackend 配置

2020-09-30 文章 superainbower
补充一下,我的错误日志
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded. For a full list of supported file systems, please 
seehttps://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


应该是没有Hadoop的路径,这个在K8s下面 该怎么去配置呢
| |


superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年09月30日 14:33,superainbower 写道:
Hi,all
请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
state.savepoints.dir: hdfs://master:8020/flink/savepoints
state.backend.incremental: true


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



Flink on K8s statebackend 配置

2020-09-30 文章 superainbower
Hi,all
   请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
state.savepoints.dir: hdfs://master:8020/flink/savepoints
state.backend.incremental: true


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
Hi Yang,

你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?

如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。

Best,
tison.


Yang Peng  于2020年9月30日周三 上午10:29写道:

> 感谢回复,我们看了consumer的lag很小
> 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> 而且任务重启了没法jstack判断了
>
> hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>
> >
> >
> >
> > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > 也可以 jstack 采下堆栈看下,GC等看下。
> > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > Best,
> > Hailong Wang
> > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > >
> > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > >
> > >>
> > >>
> > >>
> > >> Hi Yang Peng:
> > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > >> Best,
> > >> Hailong Wang
> > >>
> > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > >> >kafka消费没有积压,也没有反压,
> > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > >>
> >
>


Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章 Tianwang Li
这种有窗口统计没有影响吧?


刘建刚  于2020年9月30日周三 下午2:25写道:

> 修复方案参考https://github.com/apache/flink/pull/11830
>
> kandy.wang  于2020年9月30日周三 下午2:19写道:
>
> > group agg 开启了mini batch之后,state ttl不生效的问题:
> >
> >
> > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到
> 十几万。
> >
> >
> > sql-client-defaults.yaml对应的参数应该是这2个吧:
> > # minimum idle state retention in ms
> > min-idle-state-retention: 0
> > # maximum idle state retention in ms
> > max-idle-state-retention: 0
> > 这个现在进展如何了,这个社区打算什么时候支持
> >
> >
> >
> >
>


-- 
**
 tivanli
**


Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章 刘建刚
修复方案参考https://github.com/apache/flink/pull/11830

kandy.wang  于2020年9月30日周三 下午2:19写道:

> group agg 开启了mini batch之后,state ttl不生效的问题:
>
>
> 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。
>
>
> sql-client-defaults.yaml对应的参数应该是这2个吧:
> # minimum idle state retention in ms
> min-idle-state-retention: 0
> # maximum idle state retention in ms
> max-idle-state-retention: 0
> 这个现在进展如何了,这个社区打算什么时候支持
>
>
>
>


group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章 kandy.wang
group agg 开启了mini batch之后,state ttl不生效的问题:


现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink 
算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。


sql-client-defaults.yaml对应的参数应该是这2个吧:
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
这个现在进展如何了,这个社区打算什么时候支持