Re: 邮件列表地址错误

2020-09-06 文章 黄潇
你好,这个并不是邮件地址的错误。
在订阅邮件列表 listn...@flink.apache.org 时,需要发送一封邮件到
listname-subscr...@flink.apache.org
(这里的listname可以是dev、user、user-zh等等)。
具体可以参考 https://flink.apache.org/zh/community.html#section

darion  于2020年9月7日周一 下午1:00写道:

> http://apache-flink.147419.n8.nabble.com/ 看到的邮件列表地址是
> user-zh@flink.apache.org
> 但是我订阅的是 user-zh-subscr...@flink.apache.org  是不是更新了?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 消费kafka数据乱序问题

2020-09-06 文章 taochanglian
业务端根据用户ID 
hash发送到kafka,保证每一个用户的所有操作在kafka的同一个partition内,并且在发送端保证操作有序。


至于flink消费kafka后,乱序,不太可能,或者说可能性极小,毕竟都是按照offset来消费。

在 2020/9/4 18:59, Xiao Xu 写道:

两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理

宁吉浩  于2020年9月4日周五 下午5:56写道:


我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
我的解决办法是把迟到数据丢弃,然后进行业务计算;
另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;


--
发件人:smq <374060...@qq.com>
发送时间:2020年9月4日(星期五) 17:35
收件人:wwj ; user-zh 
主 题:回复:消费kafka数据乱序问题


换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100

---原始邮件---
发件人: "wwj"



邮件列表地址错误

2020-09-06 文章 darion
http://apache-flink.147419.n8.nabble.com/ 看到的邮件列表地址是 user-zh@flink.apache.org 
但是我订阅的是 user-zh-subscr...@flink.apache.org  是不是更新了?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-06 文章 Yun Tang
Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] 
https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云

From: Liu Rising 
Sent: Sunday, September 6, 2020 17:45
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL 任务提交后 任务名称问题

2020-09-06 文章 LittleFall
谢谢两位,我尝试一下。

也希望 flink sql client 本身支持这样的模式。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-06 文章 Xingbo Huang
Hi,
你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。

你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
[2] 来读取一个dataframe。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html

Best,
Xingbo

whh_960101  于2020年9月7日周一 上午11:22写道:

> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> dic = {1:'a',2:'b'}
> 此时定义udf如下:
>
> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> def func(dic,f):
>..
>return L
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
> >Hi,
> >
> >推荐你使用ddl来声明你上下游用的connector
> >
> >```
> >table_env.execute_sql("""
> >CREATE TABLE output (
> >data STRING ARRAY
> >) WITH (
> > 'connector' = 'filesystem',   -- required: specify the connector
> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> > 'format' = 'json',
> > 'json.fail-on-missing-field' = 'false',
> > 'json.ignore-parse-errors' = 'true'
> >)
> >""")
> >
>
> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >```
> >
> >Best,
> >Xingbo
> >
> >
> >
> >whh_960101  于2020年9月4日周五 下午3:46写道:
> >
> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> st_env.connect了一个source table(table包含a字段),
> >> 然后
> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> | | .with_format(OldCsv() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> | | .with_schema(Schema() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> | | .create_temporary_table('sink') |
> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> 然后我定义了一个udf
> >>
> >>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> def func(a):
> >> rec_list = a.split(',')
> >> res_arr = np.arrary(rec_list,dtype=str)
> >> return res_arr
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> 我想要得到array,该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
> >> >Hi,
> >> >
> >> >你是调试的时候想看结果吗?
> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >
> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >
> >> >```
> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >
> >> >@udf(input_types=DataTypes.STRING(),
> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >def func(a):
> >> > return np.array([a, a, a], dtype=str)
> >> >
> >> >table_env.register_function("func", func)
> >> >
> >> >table.select("func(b)").to_pandas()
> >> >```
> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >
> >> >whh_960101  于2020年9月4日周五 下午2:50写道:
> >> >
> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >>
> >> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> 请问这个问题该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
> >> >> >Hi,
> >> >> >
> >> >>
> >> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >whh_960101  于2020年9月4日周五 上午9:26写道:
> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> >> udf定义如下:
> >> >> >>
> >> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> >> def fun(data):
> >> >> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >> >>
> >> >> >>
> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> 

Re:Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-06 文章 whh_960101
您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
dic = {1:'a',2:'b'}
此时定义udf如下:
@udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
def func(dic,f):
   ..
   return L
st_env.register_function("func", func)
st_env.from_path("source").select("func(dic,t)").insert_into("sink") 
#这时我在外部定义好的数据类型dic字典如何作为参数传进来
这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑











在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
>Hi,
>
>推荐你使用ddl来声明你上下游用的connector
>
>```
>table_env.execute_sql("""
>CREATE TABLE output (
>data STRING ARRAY
>) WITH (
> 'connector' = 'filesystem',   -- required: specify the connector
> 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> 'format' = 'json',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true'
>)
>""")
>
>table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>```
>
>Best,
>Xingbo
>
>
>
>whh_960101  于2020年9月4日周五 下午3:46写道:
>
>> 您好,我是想让输出insert_into到目标表中,具体如下:
>> st_env=StreamExecutionEnvironment.get_execution_environment()
>> st_env.connect了一个source table(table包含a字段),
>> 然后
>> | st_env.connect(FileSystem().path('tmp')) \ |
>> | | .with_format(OldCsv() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> | | .with_schema(Schema() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> | | .create_temporary_table('sink') |
>> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> 然后我定义了一个udf
>>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> def func(a):
>> rec_list = a.split(',')
>> res_arr = np.arrary(rec_list,dtype=str)
>> return res_arr
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(a)").insert_into("sink")
>> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> res_arr[0],tmp文件里面的字符串就是正确。
>> 我想要得到array,该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
>> >Hi,
>> >
>> >你是调试的时候想看结果吗?
>> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >
>> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >
>> >```
>> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >
>> >@udf(input_types=DataTypes.STRING(),
>> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >def func(a):
>> > return np.array([a, a, a], dtype=str)
>> >
>> >table_env.register_function("func", func)
>> >
>> >table.select("func(b)").to_pandas()
>> >```
>> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >
>> >Best,
>> >Xingbo
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >
>> >whh_960101  于2020年9月4日周五 下午2:50写道:
>> >
>> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> 请问这个问题该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
>> >> >Hi,
>> >> >
>> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >whh_960101  于2020年9月4日周五 上午9:26写道:
>> >> >
>> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> udf定义如下:
>> >> >>
>> >> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> >> def fun(data):
>> >> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >> >>
>> >> >>
>> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> >> 希望您能给我提供好的解决办法,万分感谢!
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang"  写道:
>> >> >> >Hi,
>> >> >> >
>> >> >> >我觉得你从头详细描述一下你的表结构。
>> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >>
>> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >> >
>> >> >> >[1]
>> >> >> >
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> > 于2020年9月3日周四 下午9:45写道:
>> >> >> >
>> >> >> >>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >> >>
>> >> 

flink-spillable-statebackend????????????

2020-09-06 文章 jiafu
??
flink-1.11??spillable-statebackendOOM
//spillableStateBackend
env.setStateBackend(new SpillableStateBackend(path));

-yD jobmanager.memory.process.size=4096m \
-yD taskmanager.memory.process.size=4096m \
-yD taskmanager.memory.managed.size=512m \
-yD state.backend.spillable.chunk-size=128m \
-yD state.backend.spillable.gc-time.threshold=1s \
-yD state.backend.spillable.trigger-interval=1s \
-yD state.backend.spillable.heap-status.check-interval=1s \

??

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-06 文章 Jingsong Li
另外,可能和使用本地文件系统有关?换成HDFS试试?

On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li  wrote:

> Hi,
>
> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>
> On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:
>
>> Hi, all
>>
>> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
>> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>>
>> 请问有什么好的解决方式没呢?
>>
>> Best Wishes.
>>
>> Peihui He  于2020年9月4日周五 下午6:25写道:
>>
>>> Hi, all
>>>
>>> 当指定partition的时候这个问题通过path 也没法解决了
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json', -- required: file system connector)
>>>
>>>
>>> select  * from  MyUserTable  limit 10;
>>>
>>> job 会一直卡在一个地方
>>> [image: image.png]
>>>
>>> 这种改怎么解决呢?
>>>
>>> Peihui He  于2020年9月4日周五 下午6:02写道:
>>>
 hi, all
 我这边用flink sql client 创建表的时候

 CREATE TABLE MyUserTable (
   column_name1 INT,
   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
   'connector' = 'filesystem',   -- required: specify the connector
   'path' = 'file:///path/to/whatever',  -- required: path to a directory
   'format' = 'json', -- required: file system 
 connector)

 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
 sql client 提交job会很慢,最后会报错

 Caused by: org.apache.flink.runtime.rest.util.RestClientException:
 [Internal server error., >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
 already been submitted. at
 org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
 at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 at
 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
 akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
 akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
 akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
 akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
 akka.actor.ActorCell.invoke(ActorCell.scala:561) at
 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
 akka.dispatch.Mailbox.run(Mailbox.scala:225) at
 akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
 akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 End of exception on server side>] at
 org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
 at
 org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
 at
 java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
 at
 java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)


 flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。

 这种情况不知道有没有遇到过?

 Best Wishes.



>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-06 文章 Jingsong Li
Hi,

可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?

On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:

> Hi, all
>
> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>
> 请问有什么好的解决方式没呢?
>
> Best Wishes.
>
> Peihui He  于2020年9月4日周五 下午6:25写道:
>
>> Hi, all
>>
>> 当指定partition的时候这个问题通过path 也没法解决了
>>
>> CREATE TABLE MyUserTable (
>>   column_name1 INT,
>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>   'connector' = 'filesystem',   -- required: specify the connector
>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>   'format' = 'json', -- required: file system connector)
>>
>>
>> select  * from  MyUserTable  limit 10;
>>
>> job 会一直卡在一个地方
>> [image: image.png]
>>
>> 这种改怎么解决呢?
>>
>> Peihui He  于2020年9月4日周五 下午6:02写道:
>>
>>> hi, all
>>> 我这边用flink sql client 创建表的时候
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json', -- required: file system connector)
>>>
>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>> sql client 提交job会很慢,最后会报错
>>>
>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>> [Internal server error., >> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted. at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> End of exception on server side>] at
>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>> at
>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>> at
>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>
>>>
>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>
>>> 这种情况不知道有没有遇到过?
>>>
>>> Best Wishes.
>>>
>>>
>>>
>>

-- 
Best, Jingsong Lee


flink任务运行一段时间checkpoint超时,任务挂掉

2020-09-06 文章 jordan95225
Hi,
我现在有一个flink任务,运行一段时间后checkpoint会超时,INFO信息如下:
checkpoint xxx of job xxx expired before completing.
Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Excedded checkpoint toerable
failure threshold.
然后我查看了taskmanager日志,在报错之前的日志有一条WARN:
WARN  akka.remote.Remoting [] -
Association to [akka.tcp://flink@hadoop43:38839] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the UID,
gating instead for 50 ms.
这条WARN之后task就开始Attempting to cancel task Source,不知道是因为什么原因,期望收到各位的回复
Best



--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ????StreamingFileSink??hive metadata??????????????????

2020-09-06 文章 MuChen
hi,jingsong:


??
https://s1.ax1x.com/2020/09/07/wn1CFg.png


checkpoint??
2020-09-04 17:17:59
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)





----
??: 
   "user-zh"



Re: 使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-06 文章 Jingsong Li
失败的图没有呢。。具体什么异常?

On Mon, Sep 7, 2020 at 10:23 AM MuChen <9329...@qq.com> wrote:

> hi, all:
> 麻烦大佬们帮看个问题,多谢!
>
> 处理逻辑如下
> 1. 使用DataStream API读取kafka中的数据,写入DataStream ds1中
> 2. 新建一个tableEnv,并注册hive catalog:
> tableEnv.registerCatalog(catalogName, catalog);
> tableEnv.useCatalog(catalogName);
> 3. 声明以ds1为数据源的table
> Table sourcetable = tableEnv.fromDataStream(ds1);
> String souceTableName = "music_source";
> tableEnv.createTemporaryView(souceTableName, sourcetable);
> 4. 创建一张hive表:
>
> CREATE TABLE `dwd_music_copyright_test`(
>   `url` string COMMENT 'url',
>   `md5` string COMMENT 'md5',
>   `utime` bigint COMMENT '时间',
>   `title` string COMMENT '歌曲名',
>   `singer` string COMMENT '演唱者',
>   `company` string COMMENT '公司',
>   `level` int COMMENT '置信度.0是标题切词,1是acrcloud返回的结果,3是人工标准')
> PARTITIONED BY (
>   `dt` string,
>   `hour` string)ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS 
> INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION
>   'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test'
> TBLPROPERTIES (
>   'connector'='HiveCatalog',
>   'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.rolling-policy.check-interval'='30s',
>   'sink.rolling-policy.rollover-interval'='1min',
>   'sink.rolling-policy.file-size'='1MB');
>
>
> 5. 将step3表中的数据插入dwd_music_copyright_test
>
> 环境
>
> flink:1.11
> kafka:1.1.1
> hadoop:2.6.0
> hive:1.2.0
>
> 问题
> 程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区:
>
> show partitions rt_dwd.dwd_music_copyright_test;
>
> | dt=2020-08-29/hour=00  |
> | dt=2020-08-29/hour=01  |
> | dt=2020-08-29/hour=04  |
> | dt=2020-08-29/hour=05  |
>
>  但是hdfs目录下有文件生成:
>
> $ hadoop fs -du -h 
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/4.5 K   
> 13.4 K  
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=002.0
>  K   6.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=011.7
>  K   5.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=021.3
>  K   3.8 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=033.1
>  K   9.2 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04
>
>
> 且手动add partition后可以正常读取数据。
>
> 通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生:
>
>
>
>
>
> 请问:
>
> 1. exactly-once只能保证写sink文件,不能保证更新catalog吗?
> 2. 是的话有什么方案解决这个问题吗?
> 3.
> EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE?
>
> streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE);
>
>

-- 
Best, Jingsong Lee


????StreamingFileSink??hive metadata??????????????????

2020-09-06 文章 MuChen
hi, all??




1. DataStream APIkafka??DataStream ds1??
2. tableEnvhive catalog??
  tableEnv.registerCatalog(catalogName, catalog);
  tableEnv.useCatalog(catalogName);
3. ??ds1??table
  Table sourcetable = tableEnv.fromDataStream(ds1);
  String souceTableName = "music_source";
  tableEnv.createTemporaryView(souceTableName, sourcetable);
4. hive
CREATE TABLE `dwd_music_copyright_test`(   `url` string COMMENT 'url',   `md5` 
string COMMENT 'md5',   `utime` bigint COMMENT '',   `title` string COMMENT 
'??',   `singer` string COMMENT '??',   `company` string COMMENT 
'',   `level` int COMMENT 
'??.0??,1??acrcloud??,3??') PARTITIONED BY (   `dt` 
string,   `hour` string) ROW FORMAT SERDE   
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS 
INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION   
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' 
TBLPROPERTIES (   'connector'='HiveCatalog',   
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',   
'sink.partition-commit.delay'='1 min',   
'sink.partition-commit.policy.kind'='metastore,success-file',   
'sink.partition-commit.trigger'='partition-time',   
'sink.rolling-policy.check-interval'='30s',   
'sink.rolling-policy.rollover-interval'='1min',   
'sink.rolling-policy.file-size'='1MB');


5. ??step3??dwd_music_copyright_test



flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0

  hive 
catalog??hour=02??hour=03??
show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00  | | 
dt=2020-08-29/hour=01  | | dt=2020-08-29/hour=04  | | dt=2020-08-29/hour=05  |
hdfs??
$ hadoop fs -du -h 
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K   
13.4 K  
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 
2.0 K   6.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 
1.7 K   5.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 
1.3 K   3.8 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 
3.1 K   9.2 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 


??add partition


flink 
WebUI??checkpoint??StreamingFileCommitter??









??


1. exactly-once??sink??catalog
2. 
3. 
EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE??
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

Re: Flink 1.9.1的报错信息,在启动flink on yran的时候出现问题

2020-09-06 文章 Yang Wang
你把-yn 2这个参数去了看一下,这个参数很早就不能生效了
TM都是动态申请和释放的

Best,
Yang

xzw0223 <747305...@qq.com> 于2020年9月7日周一 上午9:50写道:

> 这是启动日志报错的信息  通过flink on yarn模式进行提交的
>
> [root@node01 flink-1.9.1]# bin/flink run -m yarn-cluster -yn 2
> ./examples/batch/WordCount.jar
>
> 2020-09-06 14:30:00,803 INFO  org.apache.hadoop.yarn.client.RMProxy
>
> - Connecting to ResourceManager at /0.0.0.0:8032
> 2020-09-06 14:30:00,938 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - No path for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-09-06 14:30:00,938 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - No path for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-09-06 14:30:00,947 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - The argument yn is deprecated in will be ignored.
> 2020-09-06 14:30:00,947 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - The argument yn is deprecated in will be ignored.
> 2020-09-06 14:30:01,105 INFO  org.apache.hadoop.conf.Configuration
>
> - resource-types.xml not found
> 2020-09-06 14:30:01,105 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils- Unable to
> find 'resource-types.xml'.
> 2020-09-06 14:30:01,136 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
> specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
> 2020-09-06 14:30:01,193 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The file
> system scheme is 'file'. This indicates that the specified Hadoop
> configuration path is wrong and the system is using the default Hadoop
> configuration values.The Flink YARN client needs to store its files in a
> distributed file system
> 2020-09-06 14:30:01,196 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The
> configuration directory ('/export/servers/flink-1.9.1/conf') contains both
> LOG4J and Logback configuration files. Please delete or rename one of them.
> 2020-09-06 14:30:01,600 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
> application master application_1599371603539_0004
> 2020-09-06 14:30:01,635 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1599371603539_0004
> 2020-09-06 14:30:01,635 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
> the cluster to be allocated
> 2020-09-06 14:30:01,644 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn session cluster
> at
>
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by:
>
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
> Diagnostics from YARN: Application application_1599371603539_0004 failed 2
> times due to AM Container for appattempt_1599371603539_0004_02 exited
> with  exitCode: 1
> For more detailed output, check application tracking
> page:http://node01:8088/cluster/app/application_1599371603539_0004Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_1599371603539_0004_02_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
> at org.apache.hadoop.util.Shell.run(Shell.java:482)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
> at
>
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at
>
> 

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-06 文章 Liu Rising
Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.9.1的报错信息,在启动flink on yran的时候出现问题

2020-09-06 文章 xzw0223
具体的报错信息


[root@node01 flink-1.9.1]# bin/flink run -m yarn-cluster -yn 2
./examples/batch/WordCount.jar
2020-09-06 14:30:00,803 INFO  org.apache.hadoop.yarn.client.RMProxy 
   
- Connecting to ResourceManager at /0.0.0.0:8032
2020-09-06 14:30:00,938 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- No path for the flink jar passed. Using the location of class
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-09-06 14:30:00,938 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- No path for the flink jar passed. Using the location of class
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-09-06 14:30:00,947 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- The argument yn is deprecated in will be ignored.
2020-09-06 14:30:00,947 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- The argument yn is deprecated in will be ignored.
2020-09-06 14:30:01,105 INFO  org.apache.hadoop.conf.Configuration  
   
- resource-types.xml not found
2020-09-06 14:30:01,105 INFO 
org.apache.hadoop.yarn.util.resource.ResourceUtils- Unable to
find 'resource-types.xml'.
2020-09-06 14:30:01,136 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
specification: ClusterSpecification{masterMemoryMB=1024,
taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2020-09-06 14:30:01,193 WARN 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The file
system scheme is 'file'. This indicates that the specified Hadoop
configuration path is wrong and the system is using the default Hadoop
configuration values.The Flink YARN client needs to store its files in a
distributed file system
2020-09-06 14:30:01,196 WARN 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The
configuration directory ('/export/servers/flink-1.9.1/conf') contains both
LOG4J and Logback configuration files. Please delete or rename one of them.
2020-09-06 14:30:01,600 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
application master application_1599371603539_0004
2020-09-06 14:30:01,635 INFO 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
application application_1599371603539_0004
2020-09-06 14:30:01,635 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
the cluster to be allocated
2020-09-06 14:30:01,644 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
cluster, current state ACCEPTED


 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn session cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment. 
Diagnostics from YARN: Application application_1599371603539_0004 failed 2
times due to AM Container for appattempt_1599371603539_0004_02 exited
with  exitCode: 1
For more detailed output, check application tracking
page:http://node01:8088/cluster/app/application_1599371603539_0004Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1599371603539_0004_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
at org.apache.hadoop.util.Shell.run(Shell.java:482)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at 

Flink 1.9.1的报错信息,在启动flink on yran的时候出现问题

2020-09-06 文章 xzw0223
这是启动日志报错的信息  通过flink on yarn模式进行提交的

[root@node01 flink-1.9.1]# bin/flink run -m yarn-cluster -yn 2
./examples/batch/WordCount.jar

2020-09-06 14:30:00,803 INFO  org.apache.hadoop.yarn.client.RMProxy 
   
- Connecting to ResourceManager at /0.0.0.0:8032
2020-09-06 14:30:00,938 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- No path for the flink jar passed. Using the location of class
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-09-06 14:30:00,938 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- No path for the flink jar passed. Using the location of class
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-09-06 14:30:00,947 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- The argument yn is deprecated in will be ignored.
2020-09-06 14:30:00,947 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   
- The argument yn is deprecated in will be ignored.
2020-09-06 14:30:01,105 INFO  org.apache.hadoop.conf.Configuration  
   
- resource-types.xml not found
2020-09-06 14:30:01,105 INFO 
org.apache.hadoop.yarn.util.resource.ResourceUtils- Unable to
find 'resource-types.xml'.
2020-09-06 14:30:01,136 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
specification: ClusterSpecification{masterMemoryMB=1024,
taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2020-09-06 14:30:01,193 WARN 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The file
system scheme is 'file'. This indicates that the specified Hadoop
configuration path is wrong and the system is using the default Hadoop
configuration values.The Flink YARN client needs to store its files in a
distributed file system
2020-09-06 14:30:01,196 WARN 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The
configuration directory ('/export/servers/flink-1.9.1/conf') contains both
LOG4J and Logback configuration files. Please delete or rename one of them.
2020-09-06 14:30:01,600 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
application master application_1599371603539_0004
2020-09-06 14:30:01,635 INFO 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
application application_1599371603539_0004
2020-09-06 14:30:01,635 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
the cluster to be allocated
2020-09-06 14:30:01,644 INFO 
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
cluster, current state ACCEPTED


 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn session cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment. 
Diagnostics from YARN: Application application_1599371603539_0004 failed 2
times due to AM Container for appattempt_1599371603539_0004_02 exited
with  exitCode: 1
For more detailed output, check application tracking
page:http://node01:8088/cluster/app/application_1599371603539_0004Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1599371603539_0004_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
at org.apache.hadoop.util.Shell.run(Shell.java:482)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at 

Re: flink sql多层view嵌套,字段not found

2020-09-06 文章 Leonard Xu
Hi, linhou

使用方式没问题,这个看起来像是个bug, 你可以去社区[1]建一个issue, issue上贴个可以复现这个问题的demo

Best
Leonard
[1] https://issues.apache.org/jira/projects/FLINK/summary 


> 在 2020年9月3日,18:40,Lin Hou  写道:
> 
>  activity_property



Re: Flink on k8s

2020-09-06 文章 陈韬
同样的问题遇到过一次,最后查出还是job的问题。job写的不正当,运行一段会不停的restart,最后导致TM重启。
可以从job的角度查一下


Best,
TonyChen

> 2020年9月1日 下午6:50,superainbower  写道:
> 
> hi,我在尝试flink on k8s部署,搭建了minikube,按照官方文档部署之后
> 1.在flink ui界面 看不到taskmanger的信息,任务提交的时候会提示没有slot,像是taskmanger没有注册到jobmanger
> 2.k8s dashboard上,taskmanger过不了一会就重启
> 请问下大家有遇过吗?不知道该怎么排查解决



Re: 请教:用flink实现实时告警的功能

2020-09-06 文章 刘建刚
针对规则改变,要想实时生效,有两种建议:

   1. 利用维表join的功能来join数据库中的规则,flink内部可以配置一定的缓存策略。可以查看下Lookup的实现。
   2. 也可以把规则打到kafka的表里,然后通过broadcast来广播最新的规则。


李军  于2020年9月4日周五 下午5:46写道:

> 您好!
>
>
>可以使用Flink+drools 做。drools可以实时更新规则
> 2020-9-4
> | |
> 李军
> |
> |
> hold_li...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年8月6日 10:26,samuel@ubtrobot.com 写道:
> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
> 一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5}  ---就是事件发生大于5次就发出告警;
> {"temperature": 80} ---就是温度大于80就告警;
> 二是告警实现
> 1)上报的数据写入到kafka
> 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>


flink-sql1.11????mysql????????

2020-09-06 文章 ??????
flink-sql1.11insert??connect??mysql??flink 
tablemysql??id??1.11upsertmysql??idflink
 
tablemysqlPS:??ods??intvate
 join??flink 
table??mysql??upsert