回复:flinksql有计划支持mysql catalog吗?

2021-10-11 文章 Roc Marshal
旭晨,你好。
目前这个feature已经在工作中。
欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962

祝好。
Roc.



发自 网易邮箱大师




 回复的原邮件 
| 发件人 | 赵旭晨 |
| 日期 | 2021年10月12日 10:17 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | flinksql有计划支持mysql catalog吗? |
目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?

Re:Re: Flink Sql 1.13 UDF ERROR

2021-07-11 文章 Roc Marshal
Hi, Jingsong.


 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。
 在之前提到的例子中使用基本类型做UDF参数, 
表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。







祝好。

Best Roc.








在 2021-06-29 11:02:55,"Jingsong Li"  写道:
>Hi,
>
>你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
>
>Best,
>Jingsong
>
>On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:
>
>>
>>
>> Hi, All.
>>
>>
>> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>>
>>
>> 版本: 1.13.1
>> 运行模式: IDE-application
>> ---
>> about udf define...
>>
>>
>> public static class UDFAggregateFunction extends
>> AggregateFunction {
>>
>>
>> //返回最终结果
>> @Override
>> public Double getValue(AccumulatorBean acc) {
>> return acc.totalPrice / acc.totalNum;
>> }
>>
>>
>> //构建保存中间结果的对象
>> @Override
>> public AccumulatorBean createAccumulator() {
>> return new AccumulatorBean();
>> }
>>
>>
>> //减去要撤回的值
>> public void retract(AccumulatorBean acc, double price, long num) {
>> acc.totalPrice -= price * num;
>> acc.totalNum -= num;
>> }
>>
>>
>> //从每个分区把数据取出来然后合并
>> public void merge(AccumulatorBean acc, Iterable
>> it) {
>>
>>
>> Iterator iter = it.iterator();
>> while (iter.hasNext()) {
>> AccumulatorBean a = iter.next();
>> this.accumulate(acc, a.totalPrice, a.totalNum);
>> }
>> }
>>
>>
>> //重置内存中值时调用
>> public void resetAccumulator(AccumulatorBean acc) {
>> acc.totalNum = 0;
>> acc.totalPrice = 0;
>> }
>>
>>
>> //和传入数据进行计算的逻辑
>> public void accumulate(AccumulatorBean acc, double price, long
>> num) {
>> acc.totalPrice += price * num;
>> acc.totalNum += num;
>> }
>> }
>>
>>
>>
>> 
>> About main calling
>> //TODO 流批一体的 Table API
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
>> List dataList = new ArrayList<>();
>> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
>> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
>> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
>> Table table = tableEnvironment.fromValues(DataTypes.ROW(
>> DataTypes.FIELD("user", DataTypes.STRING()),
>> DataTypes.FIELD("name", DataTypes.STRING()),
>> DataTypes.FIELD("price", DataTypes.DOUBLE()),
>> DataTypes.FIELD("num", DataTypes.BIGINT())
>> ),
>> dataList);
>> tableEnvironment.createTemporaryView("orders", table);
>>
>>
>> tableEnvironment.createTemporaryFunction("c_agg", new
>> UDFAggregateFunction());
>>
>>
>> tableEnvironment.executeSql("select user, c_agg(price, num) as
>> udf_field from orders group by user").print();
>>
>>
>>
>>
>>
>>
>>
>> 异常堆栈-
>>
>>
>>
>>
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>> at
>> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid
>> function call:
>> default_c

Re:关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启

2021-07-07 文章 Roc Marshal
Hi, 
可以先校对一下yarn的container的虚拟内存和物理内存比例的阈值参数(yarn-site.xml)。


   

祝好,Roc.














在 2021-07-08 10:44:20,"黄志高"  写道:
>flink环境1.11.0
>任务部署方式yarn per-job
>状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath"))
>每个taskManager分配8g内存,2个slot
>每10分钟做一次checkpoint,每次ck大小平均400k
>任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis
> source(kafka)->sink(s3 文件)
>
>
>问题是任务每天都会应该container被杀,导致任务重启
>Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] 
>is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB 
>physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container
>
>
>我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次
>望各位大佬帮忙看看,感谢
>
>
>


Flink SQL MYSQL schema 特性问题

2021-07-07 文章 Roc Marshal
Hi, 
   请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗?


   谢谢。


Best, Roc.

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 

Re:退订

2021-05-26 文章 Roc Marshal
张斌,你好:如需退订,请回复信息到 user-zh-unsubscr...@flink.apache.org 
后,根据提示完成后续流程,即可退订。祝好。Best, flinker.
在 2021-05-26 17:05:59,"张斌"  写道:
>
>
>退订
>| |
>张斌
>|
>|
>herobin1...@163.com
>|
>签名由网易邮箱大师定制
>


回复:退订

2021-01-27 文章 Roc Marshal
Hi, Tang.
Please send short message to user-zh-unsubscr...@flink.apache.org if you want 
to unsubscribe the mail.


Best, Roc.


| |
Roc Marshal
|
|
flin...@126.com
|
签名由网易邮箱大师定制


在2021年01月27日 16:41,唐军亮 写道:
退订

FlinkSQL 窗口使用问题

2020-10-21 文章 Roc Marshal
Hi,




SELECT

TUMBLE_START(ts, INTERVAL '1' day) as window_start,

TUMBLE_END(ts, INTERVAL '1' day) as window_end,

c1,

sum(c2) as sc2

FROM sourcetable

GROUP BY TUMBLE(ts, INTERVAL '1' day), c1

ORDER BY window_start, sc2 desc limit 10


这个sql希望能够以一天为窗口(翻滚)进行计算窗口  
按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?


谢谢!


Best Roc.

Re:Re:StreamSQL 进行窗口计算的疑问

2020-10-21 文章 Roc Marshal
SELECT

TUMBLE_START(ts, INTERVAL '1' day) as window_start,

TUMBLE_END(ts, INTERVAL '1' day) as window_end,

c1,

sum(c2) as sc2

FROM target_kafka_source_converted

GROUP BY TUMBLE(ts, INTERVAL '1' day), c1

ORDER BY window_start, sc2 desc limit 10


我的这个sql,希望能够以一天为窗口进行计算窗口  
按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?


谢谢您!


Best Roc.



















在 2020-10-21 17:21:47,"hailongwang" <18868816...@163.com> 写道:
>Hi Roc,
>  目前 SQL 不支持指定 offset,只能1天的窗口,从0点开始。
>目前有一个 Issue 在跟进这个问题:
>https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17767?filter=allopenissues
>
>
>Best,
>Hailong Wang
>
>在 2020-10-21 16:09:29,"Roc Marshal"  写道:
>>Hi,
>>
>>
>>如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢?
>>
>>
>>谢谢。
>>
>>
>>Best Roc
>>


StreamSQL 进行窗口计算的疑问

2020-10-21 文章 Roc Marshal
Hi,


如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢?


谢谢。


Best Roc



Re:Re: flink sql ddl 是否支持映射多层json

2020-10-21 文章 Roc Marshal
如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法?


谢谢

Best Roc.





在 2020-09-24 20:53:12,"Benchao Li"  写道:
>这个情况现在是支持的,可以用类似于这种写法:
>```SQL
>CREATE TABLE MyTable (
>  a11 INT,
>  a12 VARCHAR,
>  a13 ROW
>) WITH (...)
>```
>
>Roc Marshal  于2020年9月24日周四 下午7:54写道:
>
>> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
>> {
>> "a11":1,
>> "a12":"1",
>> "a13":{
>> "a21":1,
>> "a22":1,
>> "a23":"1"}
>> }
>>
>>
>> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
>>
>>
>> 谢谢
>
>
>
>-- 
>
>Best,
>Benchao Li


flink sql ddl 是否支持映射多层json

2020-09-24 文章 Roc Marshal
请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
{
"a11":1,
"a12":"1",
"a13":{
"a21":1,
"a22":1,
"a23":"1"}
}


比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?


谢谢

flink-OOME_Java heap space

2020-08-06 文章 Roc Marshal
Hi, all.
请教如下问题。
情景:jdk-oracle-1.8, flink-realse-1.10.0. 
flink-on-yarn的session模式。数据读取kafka.进行sql运算。
JVM Heap Size:638 MB
Flink Managed Memory:635 MB,出现异常如下。
  statebackend为filesystem->hadoop
   任务直接从deploying->feailed.
 其他参考信息如下图片。
 可以给些建议吗?
 谢谢。





Flink-1.10 on yarn Taskmanager启动参数问题

2020-07-27 文章 Roc Marshal
Hi, all.


 请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗?
 基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。
 




谢谢。






Best,
Roc Marshal.

Re:Flink 1.11 submit job timed out

2020-07-15 文章 Roc Marshal
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:【求助】Flink Hadoop依赖问题

2020-07-15 文章 Roc Marshal



你好,Z-Z,

可以尝试在 
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ 
下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。
祝好。
Roc Marshal.











在 2020-07-15 10:47:39,"Z-Z"  写道:
>我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下:
>version: "2.1"
>services:
> jobmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6123"
>  ports:
>   - "8081:8081"
>  command: jobmanager
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   - 
>HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
>  volumes:
>   - ./jobmanager/conf:/opt/flink/conf
>   - ./data:/data
>
>
> taskmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6121"
>   - "6122"
>  depends_on:
>   - jobmanager
>  command: taskmanager
>  links:
>   - "jobmanager:jobmanager"
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>  volumes:
>   - ./taskmanager/conf:/opt/flink/conf
>networks:
> default:
>  external:
>   name: flink-network
>
>
>
>hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could
> not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?


Re:【Flink的transformations】

2020-06-29 文章 Roc Marshal
忝忝向仧,你好。
目前Flink文档层面没有类似的映射表归档。
但是在API层面可以观察到返回信息。


Best,
Roc Marshal



在 2020-06-29 22:29:21,"忝忝向仧" <153488...@qq.com> 写道:
>Hi,all:
>
>
>请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
>
>
>谢谢.


Re:flink1.9 on yarn

2020-06-27 文章 Roc Marshal
Hi, guanyq.

关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。



关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink 
yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session
Best,
Roc Marshal

在 2020-06-28 09:09:43,"guanyq"  写道:
>问题1
>
>./bin/flink run -m 
>yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
>当yarn application -kill application_1567067657620_0254后,
>
>在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
>问题2
>
>./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
> 


Re:Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 文章 Roc Marshal
是的。


Best,
Roc Marshal.

















在 2020-06-28 10:10:20,"林恬"  写道:
>您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?
>
>
>
>
>
>
>
>--Original--
>From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM
>To: "FLINK中国"
>Subject: Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
>
>
>
>Hi, 林恬.
>首先,感谢你的反馈。
>关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
>job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。
>
>
>
>
>Best,
>Roc Marshal.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-28 09:12:41,"林恬" 各位好:
>nbsp; nbsp; 目前我使用的是Flink 1.9.2, HA使用ZK, 
>使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>nbsp;


Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 文章 Roc Marshal
Hi, 林恬.
首先,感谢你的反馈。
关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。




Best,
Roc Marshal.

















在 2020-06-28 09:12:41,"林恬"  写道:
>各位好:
>  目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>


Re:为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 文章 Roc Marshal
Hi, 立志。
能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢?


谢谢。


Best,
Roc Marshal














在 2020-06-28 09:52:10,"张立志"  写道:
>flink 版本1.8
>部署集群yarn
>
>
>配置代码:
>StreamExecutionEnvironment.stateBackend(new 
>FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
>业务代码相对比较简单,内存占用较大
>超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>
>
>
>
>


Re:??????flinksql????hbase??????????

2020-06-22 文章 Roc Marshal
MuChen1.??Hbase??zk??"org.apache.flink.shaded.curator.org.apache.curator.ConnectionStatenbsp;
 - Authentication failed JobManager Web Interface: 
http://uhadoop-op3raf-core24:42976 "2.Hbase"Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] ; 
SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
 source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
fields=[key]) ; SinkConversionToTuple2 ; Sink: SQL Client Stream Collect Sink': 
Configuring the input format (null) failed: Cannot create connection to 
HBase."??HBASEHbaseHbase.????????????Best,Roc
 Marshal.
?? 2020-06-23 11:05:43??"MuChen" <9329...@qq.com> ??
>Hi,Roc Marshal:
>
>
>
>Best,
>MuChen.
>
>
>
>
>----
>??:"Roc Marshal":2020??6??23??(??) 10:27
>??:"user-zh"
>:Re:flinksqlhbase??
>
>
>
>MuChen 
>Sourcezk Marshal.
>?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 
>2gt;amp;1 amp; # ?? 
>[admin@uhadoop-op3raf-master2 flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
># CREATE TABLE hbase_video_pic_title_q70 ( key 
>string, cf1 ROW( 'connector.type' = 'hbase', 'connector.version' = 
>'1.4.3', 'connector.table-name' = 
>'hbase_video_pic_title_q70', 'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
> 'connector.zookeeper.znode.parent' = '/hbase', 
>'connector.write.buffer-flush.max-size' = '10mb', 
>'connector.write.buffer-flush.max-rows' = '1000', 
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
>job. at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
>akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at 
>akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: 
>Could not set up JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
&

Re:flinksql????hbase??????????

2020-06-22 文章 Roc Marshal
MuChen??HBase??zk??meta??Flink??Hbase
 
Sourcezk??Best,Roc
 Marshal.
?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com> ??
>Hi, All:
>
>
>??flinksqlhbase
>
>
>
>
>
>
>hadoop??masterflink??
>
>yarn-session:
>bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 
> # ?? [admin@uhadoop-op3raf-master2 
>flink10]$ 2020-06-23 09:30:56,402 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed 2020-06-23 09:30:56,515 ERROR 
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>Authentication failed JobManager Web Interface: 
>http://uhadoop-op3raf-core24:42976 
>sql-client:
>bin/sql-client.sh embedded 
>hbaseflinksql??
>#  CREATE TABLE hbase_video_pic_title_q70 (   key string,   cf1 ROWstring, q70 string ) WITH (   'connector.type' = 'hbase',   
>'connector.version' = '1.4.3',   'connector.table-name' = 
>'hbase_video_pic_title_q70',   'connector.zookeeper.quorum' = 
>'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181',
>   'connector.zookeeper.znode.parent' = '/hbase',   
>'connector.write.buffer-flush.max-size' = '10mb',   
>'connector.write.buffer-flush.max-rows' = '1000',
>'connector.write.buffer-flush.interval' = '2s' ); 
>??
>select key from hbase_video_pic_title_q70; 
>??HBase
>[ERROR] Could not execute SQL statement. Reason: 
>org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
>error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.  
>   at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)   
>  at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)  
>   at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
>   at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: java.lang.RuntimeException: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Could not set up 
>JobManager at 
>org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at 
>org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at 
>org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at 
>org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
>'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] - 
>SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70,
> source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, 
>fields=[key]) - SinkConversionToTuple2 - Sink: SQL Client Stream 
>Collect Sink': Configuring the input format (null) failed: Cannot create 
>connection to HBase. at 
>org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
> at 
>org.apache.flink.runtime.scheduler.SchedulerBase.