回复:flinksql有计划支持mysql catalog吗?
旭晨,你好。 目前这个feature已经在工作中。 欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962 祝好。 Roc. 发自 网易邮箱大师 回复的原邮件 | 发件人 | 赵旭晨 | | 日期 | 2021年10月12日 10:17 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | flinksql有计划支持mysql catalog吗? | 目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?
Re:Re: Flink Sql 1.13 UDF ERROR
Hi, Jingsong. 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。 在之前提到的例子中使用基本类型做UDF参数, 表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。 祝好。 Best Roc. 在 2021-06-29 11:02:55,"Jingsong Li" 写道: >Hi, > >你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题 > >Best, >Jingsong > >On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal wrote: > >> >> >> Hi, All. >> >> >> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: >> >> >> 版本: 1.13.1 >> 运行模式: IDE-application >> --- >> about udf define... >> >> >> public static class UDFAggregateFunction extends >> AggregateFunction { >> >> >> //返回最终结果 >> @Override >> public Double getValue(AccumulatorBean acc) { >> return acc.totalPrice / acc.totalNum; >> } >> >> >> //构建保存中间结果的对象 >> @Override >> public AccumulatorBean createAccumulator() { >> return new AccumulatorBean(); >> } >> >> >> //减去要撤回的值 >> public void retract(AccumulatorBean acc, double price, long num) { >> acc.totalPrice -= price * num; >> acc.totalNum -= num; >> } >> >> >> //从每个分区把数据取出来然后合并 >> public void merge(AccumulatorBean acc, Iterable >> it) { >> >> >> Iterator iter = it.iterator(); >> while (iter.hasNext()) { >> AccumulatorBean a = iter.next(); >> this.accumulate(acc, a.totalPrice, a.totalNum); >> } >> } >> >> >> //重置内存中值时调用 >> public void resetAccumulator(AccumulatorBean acc) { >> acc.totalNum = 0; >> acc.totalPrice = 0; >> } >> >> >> //和传入数据进行计算的逻辑 >> public void accumulate(AccumulatorBean acc, double price, long >> num) { >> acc.totalPrice += price * num; >> acc.totalNum += num; >> } >> } >> >> >> >> >> About main calling >> //TODO 流批一体的 Table API >> TableEnvironment tableEnvironment = >> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); >> List dataList = new ArrayList<>(); >> dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); >> dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); >> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); >> Table table = tableEnvironment.fromValues(DataTypes.ROW( >> DataTypes.FIELD("user", DataTypes.STRING()), >> DataTypes.FIELD("name", DataTypes.STRING()), >> DataTypes.FIELD("price", DataTypes.DOUBLE()), >> DataTypes.FIELD("num", DataTypes.BIGINT()) >> ), >> dataList); >> tableEnvironment.createTemporaryView("orders", table); >> >> >> tableEnvironment.createTemporaryFunction("c_agg", new >> UDFAggregateFunction()); >> >> >> tableEnvironment.executeSql("select user, c_agg(price, num) as >> udf_field from orders group by user").print(); >> >> >> >> >> >> >> >> 异常堆栈- >> >> >> >> >> default_catalog.default_database.c_agg(DOUBLE, BIGINT) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) >> at >> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) >> Caused by: org.apache.flink.table.api.ValidationException: Invalid >> function call: >> default_c
Re:关于任务运行一定时间后,physical内存超出,container被kill,导致任务重启
Hi, 可以先校对一下yarn的container的虚拟内存和物理内存比例的阈值参数(yarn-site.xml)。 祝好,Roc. 在 2021-07-08 10:44:20,"黄志高" 写道: >flink环境1.11.0 >任务部署方式yarn per-job >状态后台设置的是:env.setStateBackend(new FsStateBackend("ckPath")) >每个taskManager分配8g内存,2个slot >每10分钟做一次checkpoint,每次ck大小平均400k >任务逻辑是:source(kafka)->keyBy->timeWindow->reduce的count计数->redis > source(kafka)->sink(s3 文件) > > >问题是任务每天都会应该container被杀,导致任务重启 >Container [pid=26148,containerID=container_e02_1622516404559_0038_01_08] >is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB >physical memory used; 9.8 GB of 40 GB virtual memory used. Killing container > > >我的理解是缓存数据应该不会那么多,怎么就能达到物理内存限制呢,我的window操作,理应都是key下对应一个值,key的数据也不多,缓存应该也只记录这个状态,而且window采用的是reduce操作,来一条处理一条,增量处理,而不是processFunction的攒一批处理一次 >望各位大佬帮忙看看,感谢 > > >
Flink SQL MYSQL schema 特性问题
Hi, 请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗? 谢谢。 Best, Roc.
Flink Sql 1.13 UDF ERROR
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public Double getValue(AccumulatorBean acc) { return acc.totalPrice / acc.totalNum; } //构建保存中间结果的对象 @Override public AccumulatorBean createAccumulator() { return new AccumulatorBean(); } //减去要撤回的值 public void retract(AccumulatorBean acc, double price, long num) { acc.totalPrice -= price * num; acc.totalNum -= num; } //从每个分区把数据取出来然后合并 public void merge(AccumulatorBean acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { AccumulatorBean a = iter.next(); this.accumulate(acc, a.totalPrice, a.totalNum); } } //重置内存中值时调用 public void resetAccumulator(AccumulatorBean acc) { acc.totalNum = 0; acc.totalPrice = 0; } //和传入数据进行计算的逻辑 public void accumulate(AccumulatorBean acc, double price, long num) { acc.totalPrice += price * num; acc.totalNum += num; } } About main calling //TODO 流批一体的 Table API TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); List dataList = new ArrayList<>(); dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); Table table = tableEnvironment.fromValues(DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.DOUBLE()), DataTypes.FIELD("num", DataTypes.BIGINT()) ), dataList); tableEnvironment.createTemporaryView("orders", table); tableEnvironment.createTemporaryFunction("c_agg", new UDFAggregateFunction()); tableEnvironment.executeSql("select user, c_agg(price, num) as udf_field from orders group by user").print(); 异常堆栈- default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at
Flink Sql 1.13 UDF ERROR
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public Double getValue(AccumulatorBean acc) { return acc.totalPrice / acc.totalNum; } //构建保存中间结果的对象 @Override public AccumulatorBean createAccumulator() { return new AccumulatorBean(); } //减去要撤回的值 public void retract(AccumulatorBean acc, double price, long num) { acc.totalPrice -= price * num; acc.totalNum -= num; } //从每个分区把数据取出来然后合并 public void merge(AccumulatorBean acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { AccumulatorBean a = iter.next(); this.accumulate(acc, a.totalPrice, a.totalNum); } } //重置内存中值时调用 public void resetAccumulator(AccumulatorBean acc) { acc.totalNum = 0; acc.totalPrice = 0; } //和传入数据进行计算的逻辑 public void accumulate(AccumulatorBean acc, double price, long num) { acc.totalPrice += price * num; acc.totalNum += num; } } About main calling //TODO 流批一体的 Table API TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); List dataList = new ArrayList<>(); dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); Table table = tableEnvironment.fromValues(DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.DOUBLE()), DataTypes.FIELD("num", DataTypes.BIGINT()) ), dataList); tableEnvironment.createTemporaryView("orders", table); tableEnvironment.createTemporaryFunction("c_agg", new UDFAggregateFunction()); tableEnvironment.executeSql("select user, c_agg(price, num) as udf_field from orders group by user").print(); 异常堆栈- default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at
Re:退订
张斌,你好:如需退订,请回复信息到 user-zh-unsubscr...@flink.apache.org 后,根据提示完成后续流程,即可退订。祝好。Best, flinker. 在 2021-05-26 17:05:59,"张斌" 写道: > > >退订 >| | >张斌 >| >| >herobin1...@163.com >| >签名由网易邮箱大师定制 >
回复:退订
Hi, Tang. Please send short message to user-zh-unsubscr...@flink.apache.org if you want to unsubscribe the mail. Best, Roc. | | Roc Marshal | | flin...@126.com | 签名由网易邮箱大师定制 在2021年01月27日 16:41,唐军亮 写道: 退订
FlinkSQL 窗口使用问题
Hi, SELECT TUMBLE_START(ts, INTERVAL '1' day) as window_start, TUMBLE_END(ts, INTERVAL '1' day) as window_end, c1, sum(c2) as sc2 FROM sourcetable GROUP BY TUMBLE(ts, INTERVAL '1' day), c1 ORDER BY window_start, sc2 desc limit 10 这个sql希望能够以一天为窗口(翻滚)进行计算窗口 按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。 能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪? 谢谢! Best Roc.
Re:Re:StreamSQL 进行窗口计算的疑问
SELECT TUMBLE_START(ts, INTERVAL '1' day) as window_start, TUMBLE_END(ts, INTERVAL '1' day) as window_end, c1, sum(c2) as sc2 FROM target_kafka_source_converted GROUP BY TUMBLE(ts, INTERVAL '1' day), c1 ORDER BY window_start, sc2 desc limit 10 我的这个sql,希望能够以一天为窗口进行计算窗口 按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。 能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪? 谢谢您! Best Roc. 在 2020-10-21 17:21:47,"hailongwang" <18868816...@163.com> 写道: >Hi Roc, > 目前 SQL 不支持指定 offset,只能1天的窗口,从0点开始。 >目前有一个 Issue 在跟进这个问题: >https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17767?filter=allopenissues > > >Best, >Hailong Wang > >在 2020-10-21 16:09:29,"Roc Marshal" 写道: >>Hi, >> >> >>如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢? >> >> >>谢谢。 >> >> >>Best Roc >>
StreamSQL 进行窗口计算的疑问
Hi, 如果进行滚动窗口(窗口长度为一天)取某列的聚合值,如何在SQL中指定窗口的开始滚动的时间呢?比如,希望窗口从每天的凌晨两点(作为窗口起始时间点)到第二天凌晨两点(作为窗口结束时间点)。这种语法怎么使用呢? 谢谢。 Best Roc
Re:Re: flink sql ddl 是否支持映射多层json
如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法? 谢谢 Best Roc. 在 2020-09-24 20:53:12,"Benchao Li" 写道: >这个情况现在是支持的,可以用类似于这种写法: >```SQL >CREATE TABLE MyTable ( > a11 INT, > a12 VARCHAR, > a13 ROW >) WITH (...) >``` > >Roc Marshal 于2020年9月24日周四 下午7:54写道: > >> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢? >> { >> "a11":1, >> "a12":"1", >> "a13":{ >> "a21":1, >> "a22":1, >> "a23":"1"} >> } >> >> >> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持? >> >> >> 谢谢 > > > >-- > >Best, >Benchao Li
flink sql ddl 是否支持映射多层json
请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢? { "a11":1, "a12":"1", "a13":{ "a21":1, "a22":1, "a23":"1"} } 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持? 谢谢
flink-OOME_Java heap space
Hi, all. 请教如下问题。 情景:jdk-oracle-1.8, flink-realse-1.10.0. flink-on-yarn的session模式。数据读取kafka.进行sql运算。 JVM Heap Size:638 MB Flink Managed Memory:635 MB,出现异常如下。 statebackend为filesystem->hadoop 任务直接从deploying->feailed. 其他参考信息如下图片。 可以给些建议吗? 谢谢。
Flink-1.10 on yarn Taskmanager启动参数问题
Hi, all. 请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗? 基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。 谢谢。 Best, Roc Marshal.
Re:Flink 1.11 submit job timed out
Hi,SmileSmile. 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。 希望这对你有帮助。 祝好。 Roc Marshal 在 2020-07-15 17:04:18,"SmileSmile" 写道: > >Hi > >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job >并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time >out,作业提交失败。web ui也会卡主无响应。 > >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。 > > >部分日志如下: > >2020-07-15 16:58:46,460 WARN >org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname >could be resolved for the IP address 10.32.160.7, using IP address as host >name. Local input split assignment (such as for HDFS files) may be impacted. >2020-07-15 16:58:46,460 WARN >org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname >could be resolved for the IP address 10.44.224.7, using IP address as host >name. Local input split assignment (such as for HDFS files) may be impacted. >2020-07-15 16:58:46,461 WARN >org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname >could be resolved for the IP address 10.40.32.9, using IP address as host >name. Local input split assignment (such as for HDFS files) may be impacted. > >2020-07-15 16:59:10,236 INFO >org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The >heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out. >2020-07-15 16:59:10,236 INFO >org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - >Disconnect job manager >0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 > for job e1554c737e37ed79688a15c746b6e9ef from the resource manager. > > >how to deal with ? > > >beset ! > >| | >a511955993 >| >| >邮箱:a511955...@163.com >| > >签名由 网易邮箱大师 定制
Re:【求助】Flink Hadoop依赖问题
你好,Z-Z, 可以尝试在 https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ 下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。 祝好。 Roc Marshal. 在 2020-07-15 10:47:39,"Z-Z" 写道: >我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下: >version: "2.1" >services: > jobmanager: > image: flink:1.11.0-scala_2.12 > expose: > - "6123" > ports: > - "8081:8081" > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > - >HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar > volumes: > - ./jobmanager/conf:/opt/flink/conf > - ./data:/data > > > taskmanager: > image: flink:1.11.0-scala_2.12 > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > links: > - "jobmanager:jobmanager" > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > volumes: > - ./taskmanager/conf:/opt/flink/conf >networks: > default: > external: > name: flink-network > > > >hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could > not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?
Re:【Flink的transformations】
忝忝向仧,你好。 目前Flink文档层面没有类似的映射表归档。 但是在API层面可以观察到返回信息。 Best, Roc Marshal 在 2020-06-29 22:29:21,"忝忝向仧" <153488...@qq.com> 写道: >Hi,all: > > >请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类. > > >谢谢.
Re:flink1.9 on yarn
Hi, guanyq. 关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? 这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。 关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? 我是否可以理解为,flink yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。 可参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session Best, Roc Marshal 在 2020-06-28 09:09:43,"guanyq" 写道: >问题1 > >./bin/flink run -m >yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 > >当yarn application -kill application_1567067657620_0254后, > >在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? > >问题2 > >./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? > >
Re:Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
是的。 Best, Roc Marshal. 在 2020-06-28 10:10:20,"林恬" 写道: >您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么? > > > > > > > >--Original-- >From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM >To: "FLINK中国" >Subject: Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题 > > > >Hi, 林恬. >首先,感谢你的反馈。 >关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink >job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。 > > > > >Best, >Roc Marshal. > > > > > > > > > > > > > > > > > >在 2020-06-28 09:12:41,"林恬" 各位好: >nbsp; nbsp; 目前我使用的是Flink 1.9.2, HA使用ZK, >使用过程中发现ZK上的/leader/${job_id} >节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢? > > >nbsp;
Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
Hi, 林恬. 首先,感谢你的反馈。 关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。 Best, Roc Marshal. 在 2020-06-28 09:12:41,"林恬" 写道: >各位好: > 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} >节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢? > > >
Re:为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
Hi, 立志。 能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢? 谢谢。 Best, Roc Marshal 在 2020-06-28 09:52:10,"张立志" 写道: >flink 版本1.8 >部署集群yarn > > >配置代码: >StreamExecutionEnvironment.stateBackend(new >FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); >业务代码相对比较简单,内存占用较大 >超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长 > > > > >
Re:??????flinksql????hbase??????????
MuChen1.??Hbase??zk??"org.apache.flink.shaded.curator.org.apache.curator.ConnectionStatenbsp; - Authentication failed JobManager Web Interface: http://uhadoop-op3raf-core24:42976 "2.Hbase"Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] ; SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70, source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, fields=[key]) ; SinkConversionToTuple2 ; Sink: SQL Client Stream Collect Sink': Configuring the input format (null) failed: Cannot create connection to HBase."??HBASEHbaseHbase.????????????Best,Roc Marshal. ?? 2020-06-23 11:05:43??"MuChen" <9329...@qq.com> ?? >Hi,Roc Marshal: > > > >Best, >MuChen. > > > > >---- >??:"Roc Marshal":2020??6??23??(??) 10:27 >??:"user-zh" >:Re:flinksqlhbase?? > > > >MuChen >Sourcezk Marshal. >?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com ?? >Hi, All: > > >??flinksqlhbase > > > > > > >hadoop??masterflink?? > >yarn-session: >bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli >2gt;amp;1 amp; # ?? >[admin@uhadoop-op3raf-master2 flink10]$ 2020-06-23 09:30:56,402 ERROR >org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - >Authentication failed 2020-06-23 09:30:56,515 ERROR >org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - >Authentication failed JobManager Web Interface: >http://uhadoop-op3raf-core24:42976 >sql-client: >bin/sql-client.sh embedded >hbaseflinksql?? ># CREATE TABLE hbase_video_pic_title_q70 ( key >string, cf1 ROW( 'connector.type' = 'hbase', 'connector.version' = >'1.4.3', 'connector.table-name' = >'hbase_video_pic_title_q70', 'connector.zookeeper.quorum' = >'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181', > 'connector.zookeeper.znode.parent' = '/hbase', >'connector.write.buffer-flush.max-size' = '10mb', >'connector.write.buffer-flush.max-rows' = '1000', >'connector.write.buffer-flush.interval' = '2s' ); >?? >select key from hbase_video_pic_title_q70; >??HBase >[ERROR] Could not execute SQL statement. Reason: >org.apache.flink.runtime.rest.util.RestClientException: [Internal server >error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit >job. at >org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336) > at >java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at >java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at >java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at >akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at >akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at >akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at >akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at >akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >Caused by: java.lang.RuntimeException: >org.apache.flink.runtime.client.JobExecutionException: Could not set up >JobManager at >org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at >java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: >Could not set up JobManager at >org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at >org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at >org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) > at >org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) &
Re:flinksql????hbase??????????
MuChen??HBase??zk??meta??Flink??Hbase Sourcezk??Best,Roc Marshal. ?? 2020-06-23 10:17:35??"MuChen" <9329...@qq.com> ?? >Hi, All: > > >??flinksqlhbase > > > > > > >hadoop??masterflink?? > >yarn-session: >bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 > # ?? [admin@uhadoop-op3raf-master2 >flink10]$ 2020-06-23 09:30:56,402 ERROR >org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - >Authentication failed 2020-06-23 09:30:56,515 ERROR >org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - >Authentication failed JobManager Web Interface: >http://uhadoop-op3raf-core24:42976 >sql-client: >bin/sql-client.sh embedded >hbaseflinksql?? ># CREATE TABLE hbase_video_pic_title_q70 ( key string, cf1 ROWstring, q70 string ) WITH ( 'connector.type' = 'hbase', >'connector.version' = '1.4.3', 'connector.table-name' = >'hbase_video_pic_title_q70', 'connector.zookeeper.quorum' = >'uhadoop-op3raf-master1:2181,uhadoop-op3raf-master2:2181,uhadoop-op3raf-core1:2181', > 'connector.zookeeper.znode.parent' = '/hbase', >'connector.write.buffer-flush.max-size' = '10mb', >'connector.write.buffer-flush.max-rows' = '1000', >'connector.write.buffer-flush.interval' = '2s' ); >?? >select key from hbase_video_pic_title_q70; >??HBase >[ERROR] Could not execute SQL statement. Reason: >org.apache.flink.runtime.rest.util.RestClientException: [Internal server >error., org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. > at >org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336) > at >java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at >java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at >java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at >akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at >akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >at >akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >Caused by: java.lang.RuntimeException: >org.apache.flink.runtime.client.JobExecutionException: Could not set up >JobManager at >org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at >java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more Caused by: >org.apache.flink.runtime.client.JobExecutionException: Could not set up >JobManager at >org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl. at >org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at >org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) > at >org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more Caused by: >org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task >'Source: HBaseTableSource[schema=[key, cf1], projectFields=[0]] - >SourceConversion(table=[default_catalog.default_database.hbase_video_pic_title_q70, > source: [HBaseTableSource[schema=[key, cf1], projectFields=[0, >fields=[key]) - SinkConversionToTuple2 - Sink: SQL Client Stream >Collect Sink': Configuring the input format (null) failed: Cannot create >connection to HBase. at >org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) > at >org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255) > at >org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227) > at >org.apache.flink.runtime.scheduler.SchedulerBase.