Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
1.10.1最近正在准备发布,还有几个blocker的issue,应该快了。
1.11的话,应该还比较久,现在都还没有feature freeze。

如果你可以在master上复现这个问题的话,可以建一个issue。

111  于2020年4月16日周四 上午11:32写道:

> Hi,
> 是的,我都有修改.
> 那我去jira里面重新开个issue?
>
>
> 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。
> Best,
> Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi,
是的,我都有修改.
那我去jira里面重新开个issue?


另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。
Best,
Xinghalo

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
https://issues.apache.org/jira/browse/FLINK-16068
https://issues.apache.org/jira/browse/FLINK-16345
上面这两个issue的修改都加到了1.10上了么?如果是的话,那这可能是还有其他的bug。
如果你可以在1.10和或者master分支的最新代码上复现这个问题的话,可以建一个issue来跟踪下这个问题。

111  于2020年4月16日周四 上午10:46写道:

> Hi,
> 基于1.10 源码按照jira里面的PR修改不行么?
> 跟hbase的ddl关系应该不大,就发一个kafka的吧。
>
>
> //代码占位符
> Flink SQL> CREATE TABLE kafka_test1 (
> //代码占位符
> Flink SQL> CREATE TABLE kafka_test1 (
> >   id varchar,
> >   a varchar,
> >   b int,
> >   ts as PROCTIME()
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'test',
> >   'connector.properties.zookeeper.connect' = 'localnode2:2181',
> >   'connector.properties.bootstrap.servers' = 'localnode2:9092',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'latest-offset',
> >   'format.type' = 'json'
> > )
> > ;
> [INFO] Table has been created.
>
>
> Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR
> SYSTEM_TIME AS OF a.ts as b on a.id = b.rowkey;
>
>
> 异常信息:
> //代码占位符
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT
> NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey,
> RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a,
> INTEGER b) f) NOT NULL
> converted type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIME
> ATTRIBUTE(PROCTIME) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER
> SET "UTF-16LE" a, INTEGER b) f) NOT NULL
> rel:
> LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5])
>   LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{0, 3}])
> LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()])
>   LogicalTableScan(table=[[tgou, collie, kafka_test1, source:
> [Kafka011TableSource(id, a, b)]]])
> LogicalFilter(condition=[=($cor1.id, $0)])
>   LogicalSnapshot(period=[$cor1.ts])
> LogicalTableScan(table=[[tgou, collie, hbase_test1, source:
> [HBaseTableSource[schema=[rowkey, f], projectFields=null)
>
>
> Best,
> Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi,
基于1.10 源码按照jira里面的PR修改不行么?
跟hbase的ddl关系应该不大,就发一个kafka的吧。


//代码占位符
Flink SQL> CREATE TABLE kafka_test1 (
//代码占位符
Flink SQL> CREATE TABLE kafka_test1 (
>   id varchar,
>   a varchar,
>   b int,
>   ts as PROCTIME()
> ) WITH (
>   'connector.type' = 'kafka',   
>   'connector.version' = '0.11',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localnode2:2181',
>   'connector.properties.bootstrap.servers' = 'localnode2:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'latest-offset',
>   'format.type' = 'json'
> )
> ;
[INFO] Table has been created.


Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR SYSTEM_TIME 
AS OF a.ts as b on a.id = b.rowkey;


异常信息:
//代码占位符
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT NULL ts, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, 
INTEGER b) f) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" a, INTEGER b, TIME ATTRIBUTE(PROCTIME) NOT NULL ts, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, 
INTEGER b) f) NOT NULL
rel:
LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5])
  LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 
3}])
LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()])
  LogicalTableScan(table=[[tgou, collie, kafka_test1, source: 
[Kafka011TableSource(id, a, b)]]])
LogicalFilter(condition=[=($cor1.id, $0)])
  LogicalSnapshot(period=[$cor1.ts])
LogicalTableScan(table=[[tgou, collie, hbase_test1, source: 
[HBaseTableSource[schema=[rowkey, f], projectFields=null)


Best,
Xinghalo

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
Hi,

你提到的这两个issue都是在1.10.1版本中才会修复,但是现在还没有release1.10.1版本。
你现在是用release-1.10 branch编译的么?
此外,是否方便也贴一下完整的DDL以及query呢?

111  于2020年4月16日周四 上午8:22写道:

> Hi,
> 更正一下,我的问题跟这个类似,遇到的问题也在评论中:
>
> https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%20Conversion%20to%20relational%20algebra%20failed%20to%20preserve%20datatypes%3A%22
> Best,
> Xinghalo
>
>
> 在2020年04月16日 08:18,111 写道:
> Hi,
> 我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。
> 后来使用时间字段的时候,就出了现在的问题。
> https://issues.apache.org/jira/browse/FLINK-16068
> Best,
> Xinghalo
>
>
> 在2020年04月15日 21:21,Benchao Li 写道:
> 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
> 111  于2020年4月15日周三 下午9:08写道:
>
> Hi,
> 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime
> 会报类型不匹配问题…timestamp(3)和time attribute 不匹配.
>
>
> 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table
> xxx的语法来使用。
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 Benchao Li
我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。
如果可能,建议尝试下基于事件时间的双流join。

xue...@outlook.com  于2020年4月16日周四 上午9:15写道:

> 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的
>
> 发送自 Windows 10 版邮件应用
>
> 发件人: tison
> 发送时间: 2020年4月15日 22:26
> 收件人: user-zh
> 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
>
> FYI
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html
>
> IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
> window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
> EventTime,在 Watermark
> 不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。
>
> Best,
> tison.
>
>
> tison  于2020年4月15日周三 下午10:18写道:
>
> > IngestionTime 多次运行结果不一样很正常啊,试试 event time?
> >
> > Best,
> > tison.
> >
> >
> > xuefli  于2020年4月15日周三 下午10:10写道:
> >
> >> 遇到一个非常头痛的问题
> >>
> >> Flink1.10的集群,用hdfs做backend
> >>
> >> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> >> 如果如下操作
> >>
> >> 我遇到一个问题 双流Join
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
> 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后
> 再对cStream进行keyBy-->timeWindow-->sum.
> >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> >> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> >> 但数据量很大时,就会这样。
> >>
> >>
> >> 每次计算的结果不一样,这个对业务系统挑战巨大
> >>
> >>
> >> 发送自 Windows 10 版邮件应用
> >>
> >>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink cep 匹配一段时间类A,B,C事件发生

2020-04-15 文章 Dian Fu
类似于这样?

AA follow by BB follow by CC

AA定义成A or B or C
BB定义成(A or B or C)and BB.type != AA.type
CC定义成(A or B or C)and CC.type != AA.type and CC.type != BB.type

> 在 2020年4月16日,上午8:40,Peihui He  写道:
> 
> hello,all
> 
>我这个边需要匹配一段时间内A,B,C事件同时发生,但是不要求A,B,C事件的顺序,flink cep有什么好的方式不?
> 
> 有个方案是 定义多个模式组,每个模式组是A,B,C事件的一次排列组合,但是这样比较麻烦,如果事件个数多的话,需要写太多组合。
> 
> 
> best wish



求依赖包

2020-04-15 文章 samuel....@ubtrobot.com
大家好,有哪位大神有现成的包,非常感谢!

flink-connector-elasticsearch7_2.11



深圳市优必选科技股份有限公司 | 平台软件部
邱钺 Samuel Qiu
手机/微信: +0086 150 1356 8368
Email: samuel@ubtrobot.com
UBTECH Robotics | www.ubtrobot.com 
广东省深圳市南山区平山路鸿莱科创楼13栋3楼优必选
 
From: samuel@ubtrobot.com
Date: 2020-04-15 17:37
To: user-zh
Subject: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar
在提交job后,发现不成功,这个问题要怎么解决?
版本:Flink1.10.0  elasticsearch:7.6.0

看了源码,确实是没这个类的:

Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more


谢谢!


回复: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 xue...@outlook.com
双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的

发送自 Windows 10 版邮件应用

发件人: tison
发送时间: 2020年4月15日 22:26
收件人: user-zh
主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison  于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli  于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 
>> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
>> 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>



回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi,
我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。
后来使用时间字段的时候,就出了现在的问题。
https://issues.apache.org/jira/browse/FLINK-16068
Best,
Xinghalo


在2020年04月15日 21:21,Benchao Li 写道:
这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

111  于2020年4月15日周三 下午9:08写道:

Hi,
现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime
会报类型不匹配问题…timestamp(3)和time attribute 不匹配.


所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table
xxx的语法来使用。


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复:flink1.9,后台提交job失败

2020-04-15 文章 胡泽康
是不是没有执行操作啊。例如print、collect等方法

--原始邮件--
发件人:"guanyq "

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi,
更正一下,我的问题跟这个类似,遇到的问题也在评论中:
https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%20Conversion%20to%20relational%20algebra%20failed%20to%20preserve%20datatypes%3A%22
Best,
Xinghalo


在2020年04月16日 08:18,111 写道:
Hi,
我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。
后来使用时间字段的时候,就出了现在的问题。
https://issues.apache.org/jira/browse/FLINK-16068
Best,
Xinghalo


在2020年04月15日 21:21,Benchao Li 写道:
这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

111  于2020年4月15日周三 下午9:08写道:

Hi,
现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime
会报类型不匹配问题…timestamp(3)和time attribute 不匹配.


所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table
xxx的语法来使用。


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re:flink1.9,后台提交job失败

2020-04-15 文章 guanyq
ok 找到原因了!不好意思!
在 2020-04-16 08:03:29,"guanyq"  写道:
>代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
>The program didn't contain a Flink job. Perhaps you forgot to call execute() 
>on the execution environment.


flink1.9,后台提交job失败

2020-04-15 文章 guanyq
代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
The program didn't contain a Flink job. Perhaps you forgot to call execute() on 
the execution environment.

Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 文章 tison
注意环境变量和 fs.hdfs.hdfsdefault 要配置成 HDFS 路径或 YARN
集群已知的本地路径,不要配置成客户端的路径。因为实际起作用是在拉起 TM 的那台机器上解析拉取的。

Best,
tison.


Chief  于2020年4月15日周三 下午7:40写道:

> hi Yangze Guo
> 您说的环境变量已经在当前用户的环境变量文件里面设置了,您可以看看我的问题描述,现在如果checkpoint的路径设置不是namenode
> ha的nameservice就不会报错,checkpoint都正常。
>
>
>
>
> --原始邮件--
> 发件人:"Yangze Guo" 发送时间:2020年4月15日(星期三) 下午3:00
> 收件人:"user-zh"
> 主题:Re: flink 1.7.2 YARN Session模式提交任务问题求助
>
>
>
> Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:52 PM Chief  
>  大家好
>  目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs
> namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf
> 
> 
>  2020-04-10 19:12:02,908 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1
> :23584/user/resourcemanager()
>  2020-04-10 19:12:02,909 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Resolved ResourceManager address, beginning registration
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
>  2020-04-10 19:12:02,912 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
>  2020-04-10 19:12:02,913 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registering job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,917 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registered job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - JobManager successfully registered at ResourceManager, leader
> id: .
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,920 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
>  2020-04-10 19:12:02,921 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,924 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Requesting new TaskExecutor container with resources
>   2020-04-10 19:12:02,926 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
>  2020-04-10 19:12:06,531 INFOnbsp;
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImplnbsp; nbsp;
> nbsp; nbsp; nbsp;- Received new token for :
> trusfortpoc3:35206
>  2020-04-10 19:12:06,543 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Received new container:
> 

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison  于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli  于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 
>> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
>> 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>


Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
IngestionTime 多次运行结果不一样很正常啊,试试 event time?

Best,
tison.


xuefli  于2020年4月15日周三 下午10:10写道:

> 遇到一个非常头痛的问题
>
> Flink1.10的集群,用hdfs做backend
>
> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> 如果如下操作
>
> 我遇到一个问题 双流Join 
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
> 再对cStream进行keyBy-->timeWindow-->sum.
> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> 但数据量很大时,就会这样。
>
>
> 每次计算的结果不一样,这个对业务系统挑战巨大
>
>
> 发送自 Windows 10 版邮件应用
>
>


双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 xuefli
遇到一个非常头痛的问题

Flink1.10的集群,用hdfs做backend

一个流aStream准备了10亿的数据,另外一个流bStream百万
如果如下操作
我遇到一个问题 双流Join 
带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
再对cStream进行keyBy-->timeWindow-->sum.
我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
但数据量很大时,就会这样。


每次计算的结果不一样,这个对业务系统挑战巨大


发送自 Windows 10 版邮件应用



Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 tao wang
多谢两位  Yangze and LakeShen,我研究一下。

Yangze Guo  于2020年4月15日周三 下午3:45写道:

> 1. flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量
> 2. 我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复
>
> Best,
> Yangze Guo
>
> On Wed, Apr 15, 2020 at 3:38 PM tao wang  wrote:
> >
> > 多谢回复, 还有几个问题请教:
> > 1、外部集群的hdfs-site, core-site 这些怎么配置?
> > 2、另外一个角度, 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复。
> >
> > Yangze Guo  于2020年4月15日周三 下午2:44写道:
> >>
> >> checkpoint的目录设置key为state.checkpoints.dir
> >>
> >> 你可以这样设置
> >> state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/
> >>
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
> >> >
> >> > 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
> >> >
> >> > 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
> >> >
> >> > 谢谢!!
>


Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

111  于2020年4月15日周三 下午9:08写道:

> Hi,
> 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime
> 会报类型不匹配问题…timestamp(3)和time attribute 不匹配.
>
>
> 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table
> xxx的语法来使用。
>
>
> Best,
> Xinghalo
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi,
现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime 
会报类型不匹配问题…timestamp(3)和time attribute 不匹配.


所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table 
xxx的语法来使用。


Best,
Xinghalo



Re: flink-1.10-sql 维表问题

2020-04-15 文章 Zhenghua Gao
JDBC connector 支持作为维表,DDL无需特殊字段指定。部分可选的参数可以控制temporary join行为[1]。
用作维表join时,需要使用特殊的join语法 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins

*Best Regards,*
*Zhenghua Gao*


On Wed, Apr 15, 2020 at 7:48 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

> hi 大家
> 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀?
>
>
>
> guaishushu1...@163.com
>


Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
Hi,

维表创建的DDL跟普通的source没有区别,主要是在使用的时候,需要使用维表join专有的语法。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rateFROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency


guaishushu1...@163.com  于2020年4月15日周三 下午7:48写道:

> hi 大家
> 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀?
>
>
>
> guaishushu1...@163.com
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink-1.10-sql 维表问题

2020-04-15 文章 guaishushu1...@163.com
hi 大家
想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀?



guaishushu1...@163.com


?????? flink 1.7.2 YARN Session????????????????????

2020-04-15 文章 Chief
hi Yangze Guo
??checkpoint??namenode
 ha??nameservicecheckpoint




----
??:"Yangze Guo"https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html

Best,
Yangze Guo

On Mon, Apr 13, 2020 at 10:52 PM Chief 

Re: 关于状态TTL

2020-04-15 文章 Benchao Li
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:

>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 Benchao Li
Hi,

这个是个已知问题[1],已经在1.10.1和master上修复了。你可以尝试下~

[1] https://issues.apache.org/jira/browse/FLINK-16170

samuel@ubtrobot.com  于2020年4月15日周三 下午5:37写道:

> 在提交job后,发现不成功,这个问题要怎么解决?
> 版本:Flink1.10.0  elasticsearch:7.6.0
>
> 看了源码,确实是没这个类的:
>
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
> at
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
> at
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.flink.
> elasticsearch7.shaded.org
> .elasticsearch.script.mustache.SearchTemplateRequest
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 14 more
>
>
> 谢谢!
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


关于状态TTL

2020-04-15 文章 酷酷的浑蛋


我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a
当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明

flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 samuel....@ubtrobot.com
在提交job后,发现不成功,这个问题要怎么解决?
版本:Flink1.10.0  elasticsearch:7.6.0

看了源码,确实是没这个类的:

Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
at 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.script.mustache.SearchTemplateRequest
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more


谢谢!


Re: FlinkSQL构建流式应用checkpoint设置

2020-04-15 文章 godfrey he
Hi Even,

1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
另外 SQL CLI 还不支持 checkpoint 的设置。
2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL
CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1], 这样你创建的表会持久化到
hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files

Best,
Godfrey

Even <452232...@qq.com> 于2020年4月15日周三 下午3:35写道:

> Hi!
> 请教两个问题:
> 1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数?
> 2、 Flink SQL CLI
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建?


Re: 关于FLINK PYTHON UDF

2020-04-15 文章 Xingbo Huang
Hi,
我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。
我把我的测试代码放到附件里面了,
你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下

Best,
Xingbo


秦寒  于2020年4月15日周三 下午3:16写道:

> 你好
>
>我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink
> 文件里面没有任何数据,
>
> 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下
>
>
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
>
>
> *测试结果*
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
> st_env.from_path("source")\
> .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
> *.select("add(b1,c1)") \ **无任何输出*
> .insert_into("result_tab")
>
> *无任何输出*
>
>
>
>
>
> st_env.from_path("source")\
> .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
> *.select("c1")\*   #正常输出
>
>
> .insert_into("result_tab")
>
> *正确输出*
>
>
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
import os
from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem, OldCsv
from pyflink.table.udf import udf


def test_udf():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env)

result_file = "/tmp/scalar_func_basic.csv"
if os.path.exists(result_file):
os.remove(result_file)

st_env.register_table_sink("Results",
   CsvTableSink(['a'],
[DataTypes.BIGINT()],
result_file))

st_env.register_function("add",
 udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
 DataTypes.BIGINT()))
st_env \
.connect(  # declare the external system to connect to
Kafka()
.version("0.11")
.topic("user")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format(  # declare a format for this system
Json()
.fail_on_missing_field(True)
.json_schema(
"{"
"  type: 'object',"
"  properties: {"
"a: {"
"  type: 'string'"
"},"
"b: {"
"  type: 'string'"
"},"
"c: {"
"  type: 'string'"
"}"
"  }"
"}"
)
) \
.with_schema(  # declare the schema of the table
Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
) \
.in_append_mode() \
.register_table_source("source")

st_env.from_path("source") \
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("add(b1, c1)") \
.insert_into("Results")
st_env.execute("test")


if __name__ == '__main__':
test_udf()


Re: 关于flink检查点

2020-04-15 文章 half coke
是的,根据任务负载的变化自动调整checkpoint的间隔,或者可以通过用户写的逻辑调整检查点。
刚开始学习flink,想请教一下。

Congxian Qiu  于2020年4月15日周三 下午12:33写道:

> hi
>
> 你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?
>
> Best,
> Congxian
>
>
> half coke  于2020年4月15日周三 下午12:24写道:
>
> > 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
> >
>


Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
非常开心能够帮助到你~

刘建刚  于2020年4月15日周三 下午3:57写道:

> 感谢 Benchao,问题应解决了!
>
> 2020年4月15日 下午3:38,Benchao Li  写道:
>
> Hi 建刚,
>
> 现在Emit的原理是这样子的:
> - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*;
> - 当定时器到了的时候,
>   - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
>  - 如果有变化,就发送-[old], +[new] 两条结果到下游;
>  - 如果是*没有变化,则不做任何处理*;
>   - 再次注册一个新的emit delay之后的处理时间定时器。
>
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
>
> 刘建刚  于2020年4月15日周三 下午3:32写道:
>
>>
>> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
>>
>> public class EarlyEmitter {
>>public static void main(String[] args) throws Exception {
>>   StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>   env.setParallelism(1);
>>
>>   EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().useBlink
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>>
>> tEnv.getConfig().getConfiguration().setBoolean(
>> "table.exec.emit.early-fire.enabled", true);
>> tEnv.getConfig().getConfiguration().setString(
>> "table.exec.emit.early-fire.delay", "1000 ms");
>>
>> Table table = tEnv.fromDataStream(
>> env.addSource(new SourceData()), "generate_time, name, city, id,
>> event_time.proctime");
>> tEnv.createTemporaryView("person", table);
>>
>> String emit =
>> "SELECT name, COUNT(DISTINCT id)" +
>> "FROM person " +
>> "GROUP BY TUMBLE(event_time, interval '10' second), name";
>>
>> Table result = tEnv.sqlQuery(emit);
>> tEnv.toRetractStream(result, Row.class).print();
>>
>> env.execute("IncrementalGrouping");
>> }
>>
>> private static final class SourceData implements
>> SourceFunction> {
>> @Override
>> public void run(SourceContext> ctx) throws
>> Exception {
>> while (true) {
>> long time = System.currentTimeMillis();
>> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
>> }
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> }
>> }
>>
>>
>>
>>
>>
>> 2020年3月27日 下午3:23,Benchao Li  写道:
>>
>> Hi,
>>
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>>
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>>
>> Jingsong Li  于2020年3月27日周五 下午2:51写道:
>>
>> Hi,
>>
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>
>> Best,
>> Jingsong Lee
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com ; libenc...@pku.edu.cn
>>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flinksql如何控制结果输出的频率

2020-04-15 文章 刘建刚
感谢 Benchao,问题应解决了!

> 2020年4月15日 下午3:38,Benchao Li  写道:
> 
> Hi 建刚,
> 
> 现在Emit的原理是这样子的:
> - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器;
> - 当定时器到了的时候,
>   - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
>  - 如果有变化,就发送-[old], +[new] 两条结果到下游;
>  - 如果是没有变化,则不做任何处理;
>   - 再次注册一个新的emit delay之后的处理时间定时器。
> 
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
> 
> 刘建刚 mailto:liujiangangp...@gmail.com>> 
> 于2020年4月15日周三 下午3:32写道:
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
> 
> public class EarlyEmitter {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
> 
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlink
>   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>   
> tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
>  true);
>   
> tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
>  "1000 ms");
> 
>   Table table = tEnv.fromDataStream(
> env.addSource(new SourceData()), "generate_time, name, city, id, 
> event_time.proctime");
>   tEnv.createTemporaryView("person", table);
> 
>   String emit =
> "SELECT name, COUNT(DISTINCT id)" +
>   "FROM person " +
>   "GROUP BY TUMBLE(event_time, interval '10' second), name";
> 
>   Table result = tEnv.sqlQuery(emit);
>   tEnv.toRetractStream(result, Row.class).print();
> 
>   env.execute("IncrementalGrouping");
>}
> 
>private static final class SourceData implements 
> SourceFunction> {
>   @Override
>   public void run(SourceContext> ctx) 
> throws Exception {
>  while (true) {
> long time = System.currentTimeMillis();
> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
>  }
>   }
> 
>   @Override
>   public void cancel() {
> 
>   }
>}
> }
> 
> 
> 
> 
>> 2020年3月27日 下午3:23,Benchao Li > > 写道:
>> 
>> Hi,
>> 
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>> 
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>> 
>> Jingsong Li mailto:jingsongl...@gmail.com>> 
>> 于2020年3月27日周五 下午2:51写道:
>> 
>>> Hi,
>>> 
>>> For #1:
>>> 创建级联的两级window:
>>> - 1分钟窗口
>>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
>> 
>> 
>> -- 
>> 
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com ; 
>> libenc...@pku.edu.cn 
> 
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com ; libenc...@pku.edu.cn 
> 


Re: Flink On Yarn , ResourceManager is HA , if active ResourceManager changed,what is flink task status ?

2020-04-15 文章 Xintong Song
Normally, Yarn RM switch should not cause any problem to the running Flink
instance. Unless the RM switch takes too long and Flink happens to request
new containers during that time, it might lead to resource allocation
timeout.

Thank you~

Xintong Song



On Wed, Apr 15, 2020 at 3:49 PM LakeShen  wrote:

> Hi community,
>
> I have a question about flink on yarn ha , if active resourcemanager
> changed, what is the flink task staus. Is flink task running normally?
> Should I must restart my flink task to run?
>
> Thanks to your reply.
>
> Best,
> LakeShen
>


Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 Yangze Guo
1. flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量
2. 我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复

Best,
Yangze Guo

On Wed, Apr 15, 2020 at 3:38 PM tao wang  wrote:
>
> 多谢回复, 还有几个问题请教:
> 1、外部集群的hdfs-site, core-site 这些怎么配置?
> 2、另外一个角度, 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复。
>
> Yangze Guo  于2020年4月15日周三 下午2:44写道:
>>
>> checkpoint的目录设置key为state.checkpoints.dir
>>
>> 你可以这样设置
>> state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
>> >
>> > 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
>> >
>> > 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
>> >
>> > 谢谢!!


Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
Hi 建刚,

现在Emit的原理是这样子的:
- *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*;
- 当定时器到了的时候,
  - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
 - 如果有变化,就发送-[old], +[new] 两条结果到下游;
 - 如果是*没有变化,则不做任何处理*;
  - 再次注册一个新的emit delay之后的处理时间定时器。

你可以根据这个原理,再对照下你的数据,看看是否符合预期。

刘建刚  于2020年4月15日周三 下午3:32写道:

>
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
>
> public class EarlyEmitter {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlink
>
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings)
> ;
>
> tEnv.getConfig().getConfiguration().setBoolean(
> "table.exec.emit.early-fire.enabled", true);
> tEnv.getConfig().getConfiguration().setString(
> "table.exec.emit.early-fire.delay", "1000 ms");
>
> Table table = tEnv.fromDataStream(
> env.addSource(new SourceData()), "generate_time, name, city, id,
> event_time.proctime");
> tEnv.createTemporaryView("person", table);
>
> String emit =
> "SELECT name, COUNT(DISTINCT id)" +
> "FROM person " +
> "GROUP BY TUMBLE(event_time, interval '10' second), name";
>
> Table result = tEnv.sqlQuery(emit);
> tEnv.toRetractStream(result, Row.class).print();
>
> env.execute("IncrementalGrouping");
> }
>
> private static final class SourceData implements
> SourceFunction> {
> @Override
> public void run(SourceContext> ctx) throws
> Exception {
> while (true) {
> long time = System.currentTimeMillis();
> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
> }
> }
>
> @Override
> public void cancel() {
>
> }
> }
> }
>
>
>
>
>
> 2020年3月27日 下午3:23,Benchao Li  写道:
>
> Hi,
>
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
>
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>
> Jingsong Li  于2020年3月27日周五 下午2:51写道:
>
> Hi,
>
> For #1:
> 创建级联的两级window:
> - 1分钟窗口
> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>
> Best,
> Jingsong Lee
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com ; libenc...@pku.edu.cn
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 LakeShen
Hi tao wang,

你可以在你的 flink-conf.yaml 里面配置 Checkpoint 的目录,就像楼上 Yangze 所说

state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/

Best,
LakeShen

Yangze Guo  于2020年4月15日周三 下午2:44写道:

> checkpoint的目录设置key为state.checkpoints.dir
>
> 你可以这样设置
> state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/
>
>
> Best,
> Yangze Guo
>
> On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
> >
> > 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
> >
> > 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
> >
> > 谢谢!!
>


FlinkSQL????????????checkpoint????

2020-04-15 文章 Even
Hi??
??
1?? Flink SQL CLI 
??DDL??checkpoint??
2?? Flink SQL CLI 
??CLItable??

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 刘建刚
我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:

public class EarlyEmitter {
   public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);

  EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);

  
tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
 true);
  
tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
 "1000 ms");

  Table table = tEnv.fromDataStream(
env.addSource(new SourceData()), "generate_time, name, city, id, 
event_time.proctime");
  tEnv.createTemporaryView("person", table);

  String emit =
"SELECT name, COUNT(DISTINCT id)" +
  "FROM person " +
  "GROUP BY TUMBLE(event_time, interval '10' second), name";

  Table result = tEnv.sqlQuery(emit);
  tEnv.toRetractStream(result, Row.class).print();

  env.execute("IncrementalGrouping");
   }

   private static final class SourceData implements SourceFunction> {
  @Override
  public void run(SourceContext> ctx) 
throws Exception {
 while (true) {
long time = System.currentTimeMillis();
ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
 }
  }

  @Override
  public void cancel() {

  }
   }
}




> 2020年3月27日 下午3:23,Benchao Li  写道:
> 
> Hi,
> 
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
> 
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
> 
> Jingsong Li  于2020年3月27日周五 下午2:51写道:
> 
>> Hi,
>> 
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>> 
>> Best,
>> Jingsong Lee
>> 
> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn



关于FLINK PYTHON UDF

2020-04-15 文章 秦寒
你好

   我在使用kafka produce数据后,在python中使用UDF做一个add function,但
是最后的sink文件里面没有任何数据,

如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG
很久也不清楚是什么原因是否能帮忙分下

 

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 



 



 

 

测试结果

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}



 

 

 

st_env.from_path("source")\
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("add(b1,c1)") \ 无任何输出
.insert_into("result_tab")

无任何输出



 

 

st_env.from_path("source")\
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("c1")\   #正常输出

.insert_into("result_tab")

正确输出





Re: flink java.util.concurrent.TimeoutException

2020-04-15 文章 Yangze Guo
日志上看是Taskmanager心跳超时了,如果tm还在,是不是网络问题呢?尝试把heartbeat.timeout调大一些试试?

Best,
Yangze Guo

On Mon, Apr 13, 2020 at 10:40 AM 欧阳苗  wrote:
>
> job运行了两天就挂了,然后抛出如下异常,但是taskManager没有挂,其他的job还能正常在上面跑,请问这个问题是什么原因导致的,有什么好的解决办法吗
>
>
> 2020-04-13 06:20:31.379 ERROR 1 --- [ent-IO-thread-3] 
> org.apache.flink.runtime.rest.RestClient.parseResponse:393 : Received 
> response was neither of the expected type ([simple type, class 
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) 
> nor an error. 
> Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"2d2a0b4efc8c3d973e2e9490b7b3b2f1","application-status":"FAILED","accumulator-results":{},"net-runtime":217272900,"failure-cause":{"class":"java.util.concurrent.TimeoutException","stack-trace":"java.util.concurrent.TimeoutException:
>  Heartbeat of TaskManager with id 0a4ea651244982ef4b4b7092d18de776 timed 
> out.\n\tat 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)\n\tat
>  
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)\n\tat
>  akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)\n\tat 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)\n\tat
>  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
>  
> 

Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 文章 Yangze Guo
Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html

Best,
Yangze Guo

On Mon, Apr 13, 2020 at 10:52 PM Chief  wrote:
>
> 大家好
> 目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs 
> namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf
>
>
> 2020-04-10 19:12:02,908 INFO 
> org.apache.flink.runtime.jobmaster.JobMaster
>  - Connecting to ResourceManager 
> akka.tcp://flink@trusfortpoc1:23584/user/resourcemanager()
> 2020-04-10 19:12:02,909 INFO 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool   
>   - Cannot serve slot request, no ResourceManager connected. 
> Adding as pending request [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
> 2020-04-10 19:12:02,911 INFO 
> org.apache.flink.runtime.jobmaster.JobMaster
>  - Resolved ResourceManager address, 
> beginning registration
> 2020-04-10 19:12:02,911 INFO 
> org.apache.flink.runtime.jobmaster.JobMaster
>  - Registration at ResourceManager attempt 
> 1 (timeout=100ms)
> 2020-04-10 19:12:02,912 INFO 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool   
>   - Cannot serve slot request, no ResourceManager connected. 
> Adding as pending request [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
> 2020-04-10 19:12:02,913 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Registering job manager 
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
>  for job 24691b33c18d7ad73b1f52edb3d68ae4.
> 2020-04-10 19:12:02,917 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Registered job manager 
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
>  for job 24691b33c18d7ad73b1f52edb3d68ae4.
> 2020-04-10 19:12:02,919 INFO 
> org.apache.flink.runtime.jobmaster.JobMaster
>  - JobManager successfully registered at 
> ResourceManager, leader id: .
> 2020-04-10 19:12:02,919 INFO 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool   
>   - Requesting new slot 
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile 
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
> 2020-04-10 19:12:02,920 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Request slot with profile 
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
> nativeMemoryInMB=0, networkMemoryInMB=0} for job 
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id 
> AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
> 2020-04-10 19:12:02,921 INFO 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool   
>   - Requesting new slot 
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile 
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
> 2020-04-10 19:12:02,924 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Requesting new TaskExecutor 
> container with resources  1.
> 2020-04-10 19:12:02,926 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Request slot with profile 
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
> nativeMemoryInMB=0, networkMemoryInMB=0} for job 
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id 
> AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
> 2020-04-10 19:12:06,531 INFO 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl   
>  - Received new token for : trusfortpoc3:35206
> 2020-04-10 19:12:06,543 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Received new container: 
> container_1586426824930_0006_01_02 - Remaining pending container 
> requests: 1
> 2020-04-10 19:12:06,543 INFO 
> org.apache.flink.yarn.YarnResourceManager 
>  - Removing container request 
> Capability[ 0.
> 2020-04-10 19:12:06,568 ERROR org.apache.flink.yarn.YarnResourceManager 
>  - Could 
> not start TaskManager in container container_1586426824930_0006_01_02.
> java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> hdfsClusterForML
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient. at org.apache.hadoop.hdfs.DFSClient. at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93)
> at 
> 

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 Yangze Guo
checkpoint的目录设置key为state.checkpoints.dir

你可以这样设置
state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/


Best,
Yangze Guo

On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
>
> 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
>
> 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
>
> 谢谢!!