Re: 关于窗口函数不闭合的问题
Hi Fei, 可以带上你的 SQL 代码吗?你用的版本和planner 是哪个? 以及具体描述下”当前数据量无法统计,只能到下次才能统计到“,这个现象?不是很理解。 Best, Jark On Wed, 1 Apr 2020 at 18:00, Fei Han wrote: > Hi,大家好: > 在做窗口统计的时候,用count over和sum over出现当前数据量无法统计,只能到下次才能统计到。 > 是参数写错了,还是另有其他函数,数据过来应该类似闭区间,现在是开区间的。请大家给个建议,谢谢啦?
Re: Flink 1.10.0 HiveModule 函数问题
Hi, GenericUDTFExplode是一个UDTF。 Flink中使用UDTF的方式是标准SQL的方式: "select x from db1.nested, lateral table(explode(a)) as T(x)" 你试下。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#table-functions Best, Jingsong Lee On Thu, Apr 2, 2020 at 11:22 AM Yaoting Gong wrote: > 大家后, > >我们项目目前 集成了HiveModule,遇到一些问题。请教下大家。 > > 在集成 Hive Module 之前,substr,split 都是无法使用的。集成后,验证都是可以的。 > 比如:select split('1,2,2,4',',') > > 但是在使用 explode 函数,select explode(split('1,2,2,4',',')); > 有如下错误: > > > The main method caused an error: SQL validation failed. From line 1, column > 8 to line 1, column 36: *No match found for function signature explode(* > *) * > > *完整堆栈:* > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: SQL validation failed. From line 1, column 8 to line 1, > column 36: No match found for function signature explode() > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$38/1205406622.call(Unknown > Source) > at > > org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/293907205.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 8 to line 1, column 36: No match found for > function signature explode() > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > at > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88) > at > > com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87) > at com.sui.bigdata.PlatformEngine$$$Lambda$765/623545006.apply(Unknown > Source) > at scala.collection.immutable.List.foreach(List.scala:388) > at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87) > at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 8 to line 1, column 36: No match found for function signature > explode() > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1773) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:293) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) > at > >
Flink 1.10.0 HiveModule 函数问题
大家后, 我们项目目前 集成了HiveModule,遇到一些问题。请教下大家。 在集成 Hive Module 之前,substr,split 都是无法使用的。集成后,验证都是可以的。 比如:select split('1,2,2,4',',') 但是在使用 explode 函数,select explode(split('1,2,2,4',',')); 有如下错误: The main method caused an error: SQL validation failed. From line 1, column 8 to line 1, column 36: *No match found for function signature explode(* *) * *完整堆栈:* The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 8 to line 1, column 36: No match found for function signature explode() at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at org.apache.flink.client.cli.CliFrontend$$Lambda$38/1205406622.call(Unknown Source) at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/293907205.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 36: No match found for function signature explode() at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88) at com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87) at com.sui.bigdata.PlatformEngine$$$Lambda$765/623545006.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:388) at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87) at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 13 more Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 36: No match found for function signature explode() at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1773) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:293) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479) at
回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
Hi, 了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。 多谢多谢 Best, Xinghalo 在2020年04月2日 10:52,Benchao Li 写道: 你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法: SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch source。真正的维表的代码是在JDBCLookupFunction里面的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 111 于2020年4月2日周四 上午10:33写道: Hi, 试验了下貌似不行,我的sql: select s.*, item.product_name from ( select member_id, uuid, page, cast(SPLIT_INDEX(page, '10.pd.item-', 1) as string) as item_id, `time` from tgs_topic_t1 where page like '10.pd.item-%’ ) s inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as string) = s.item_id where s.item_id is not null 看了下代码JDBCTableSource中的实现 String query = dialect.getSelectFromStatement( options.getTableName(), returnType.getFieldNames(), new String[0]); 构建sql的时候 conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。 在2020年04月2日 10:11,Benchao Li 写道: Hi, 能否把你的SQL也发出来呢? 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。 111 于2020年4月2日周四 上午9:55写道: Hi, 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: 1 获取查询sql中的字段和表名,拼接成select a, b, c from t 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx 3 执行sql,加载到内存(不确定后面缓存的实现细节) 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 不知道有没有什么优雅的解决方案? Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法: SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch source。真正的维表的代码是在JDBCLookupFunction里面的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 111 于2020年4月2日周四 上午10:33写道: > Hi, > 试验了下貌似不行,我的sql: > > > select s.*, item.product_name > from ( select member_id, uuid, page, cast(SPLIT_INDEX(page, > '10.pd.item-', 1) as string) as item_id, `time` from tgs_topic_t1 where > page like '10.pd.item-%’ ) s > inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as > string) = s.item_id > where s.item_id is not null > > > 看了下代码JDBCTableSource中的实现 > String query = dialect.getSelectFromStatement( options.getTableName(), > returnType.getFieldNames(), new String[0]); > 构建sql的时候 conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。 > 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。 > 在2020年04月2日 10:11,Benchao Li 写道: > Hi, > > 能否把你的SQL也发出来呢? > > 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。 > > 111 于2020年4月2日周四 上午9:55写道: > > Hi, > 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: > 1 获取查询sql中的字段和表名,拼接成select a, b, c from t > 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx > 3 执行sql,加载到内存(不确定后面缓存的实现细节) > > > > > 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 > 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 > 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? > > > 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 > 不知道有没有什么优雅的解决方案? > > > Best, > Xinghalo > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: Flink实时写入hive异常
是的,有关的,这个umbrella issue就是FLIP-115. Best, Jingsong Lee On Wed, Apr 1, 2020 at 10:57 PM 叶贤勋 wrote: > Hi jingsong, > 我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关? > > > [1] https://issues.apache.org/jira/browse/FLINK-14255 > > > | | > 叶贤勋 > | > | > yxx_c...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年04月1日 16:28,111 写道: > Hi jingsong, > 那厉害了,相当于Flink内部做了一个数据湖的插件了。 > Best, > Xinghalo -- Best, Jingsong Lee
回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
Hi, 试验了下貌似不行,我的sql: select s.*, item.product_name from ( select member_id, uuid, page, cast(SPLIT_INDEX(page, '10.pd.item-', 1) as string) as item_id, `time` from tgs_topic_t1 where page like '10.pd.item-%’ ) s inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as string) = s.item_id where s.item_id is not null 看了下代码JDBCTableSource中的实现 String query = dialect.getSelectFromStatement( options.getTableName(), returnType.getFieldNames(), new String[0]); 构建sql的时候 conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。 在2020年04月2日 10:11,Benchao Li 写道: Hi, 能否把你的SQL也发出来呢? 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。 111 于2020年4月2日周四 上午9:55写道: Hi, 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: 1 获取查询sql中的字段和表名,拼接成select a, b, c from t 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx 3 执行sql,加载到内存(不确定后面缓存的实现细节) 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 不知道有没有什么优雅的解决方案? Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: flink 1.10 createTemporaryTable丢失proctime问题
registerTableSource 被标记了@Deprecated 在flink 1.10,我这种情况是继续沿用过期的API(registerTableSource)吗? 原始邮件 发件人: deadwind4 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:30 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 修改前 tEnv.connect().withFormat().withSchema( xxx.proctime() ).registerTableSource(“foo”); 修改后 tEnv.connect().withFormat().withSchema( xxx.proctime() ).createTemporaryTable(“foo”); 修改后.proctime()就失效了,所以我proctime window也用不了了。 原始邮件 发件人: deadwind4 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:22 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 tEnv.connect().withFormat().withSchema().registerTableSource(“foo”); tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”); 原始邮件 发件人: Jark Wu 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:18 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 写道: > > 我其实是想用processing time window 但是我把过期的API registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 wrote: > > 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 > 如果我想使用createTemporaryTable该怎么办。 > 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
Re: flink 1.10 createTemporaryTable丢失proctime问题
修改前 tEnv.connect().withFormat().withSchema( xxx.proctime() ).registerTableSource(“foo”); 修改后 tEnv.connect().withFormat().withSchema( xxx.proctime() ).createTemporaryTable(“foo”); 修改后.proctime()就失效了,所以我proctime window也用不了了。 原始邮件 发件人: deadwind4 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:22 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 tEnv.connect().withFormat().withSchema().registerTableSource(“foo”); tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”); 原始邮件 发件人: Jark Wu 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:18 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 写道: > > 我其实是想用processing time window 但是我把过期的API registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 wrote: > > 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 > 如果我想使用createTemporaryTable该怎么办。 > 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
Re: flink 1.10 createTemporaryTable丢失proctime问题
tEnv.connect().withFormat().withSchema().registerTableSource(“foo”); tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”); 原始邮件 发件人: Jark Wu 收件人: user-zh 发送时间: 2020年4月2日(周四) 10:18 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 写道: > > 我其实是想用processing time window 但是我把过期的API registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 wrote: > > 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 > 如果我想使用createTemporaryTable该怎么办。 > 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
Hi benchao, 原来如此,我这边只是做了普通查询,并没有走join。 我加上join条件再试下哈 Best, Xinghalo 在2020年04月2日 10:11,Benchao Li 写道: Hi, 能否把你的SQL也发出来呢? 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。 111 于2020年4月2日周四 上午9:55写道: Hi, 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: 1 获取查询sql中的字段和表名,拼接成select a, b, c from t 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx 3 执行sql,加载到内存(不确定后面缓存的实现细节) 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 不知道有没有什么优雅的解决方案? Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: flink 1.10 createTemporaryTable丢失proctime问题
Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 写道: > > 我其实是想用processing time window 但是我把过期的API > registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 > System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? > Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 > wrote: > > > 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 > > 如果我想使用createTemporaryTable该怎么办。 > > 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
Hi, 能否把你的SQL也发出来呢? 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。 111 于2020年4月2日周四 上午9:55写道: > Hi, > 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: > 1 获取查询sql中的字段和表名,拼接成select a, b, c from t > 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx > 3 执行sql,加载到内存(不确定后面缓存的实现细节) > > > > 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 > 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 > 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? > > > 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 > 不知道有没有什么优雅的解决方案? > > > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out
Hi, 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是: 1 获取查询sql中的字段和表名,拼接成select a, b, c from t 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx 3 执行sql,加载到内存(不确定后面缓存的实现细节) 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢? 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。 不知道有没有什么优雅的解决方案? Best, Xinghalo
回复: Flink实时写入hive异常
Hi jingsong, 我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关? [1] https://issues.apache.org/jira/browse/FLINK-14255 | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制 在2020年04月1日 16:28,111 写道: Hi jingsong, 那厉害了,相当于Flink内部做了一个数据湖的插件了。 Best, Xinghalo
Re: [Third-party Tool] Flink memory calculator
@Marta Thanks for the tip! I'll do that. Best, Yangze Guo On Wed, Apr 1, 2020 at 8:05 PM Marta Paes Moreira wrote: > > Hey, Yangze. > > I'd like to suggest that you submit this tool to Flink Community Pages [1]. > That way it can get more exposure and it'll be easier for users to find it. > > Thanks for your contribution! > > [1] https://flink-packages.org/ > > On Tue, Mar 31, 2020 at 9:09 AM Yangze Guo wrote: >> >> Hi, there. >> >> In the latest version, the calculator supports dynamic options. You >> could append all your dynamic options to the end of "bin/calculator.sh >> [-h]". >> Since "-tm" will be deprecated eventually, please replace it with >> "-Dtaskmanager.memory.process.size=". >> >> Best, >> Yangze Guo >> >> On Mon, Mar 30, 2020 at 12:57 PM Xintong Song wrote: >> > >> > Hi Jeff, >> > >> > I think the purpose of this tool it to allow users play with the memory >> > configurations without needing to actually deploy the Flink cluster or >> > even have a job. For sanity checks, we currently have them in the start-up >> > scripts (for standalone clusters) and resource managers (on >> > K8s/Yarn/Mesos). >> > >> > I think it makes sense do the checks earlier, i.e. on the client side. But >> > I'm not sure if JobListener is the right place. IIUC, JobListener is >> > invoked before submitting a specific job, while the mentioned checks >> > validate Flink's cluster level configurations. It might be okay for a job >> > cluster, but does not cover the scenarios of session clusters. >> > >> > Thank you~ >> > >> > Xintong Song >> > >> > >> > >> > On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo wrote: >> >> >> >> Thanks for your feedbacks, @Xintong and @Jeff. >> >> >> >> @Jeff >> >> I think it would always be good to leverage exist logic in Flink, such >> >> as JobListener. However, this calculator does not only target to check >> >> the conflict, it also targets to provide the calculating result to >> >> user before the job is actually deployed in case there is any >> >> unexpected configuration. It's a good point that we need to parse the >> >> dynamic configs. I prefer to parse the dynamic configs and cli >> >> commands in bash instead of adding hook in JobListener. >> >> >> >> Best, >> >> Yangze Guo >> >> >> >> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang wrote: >> >> > >> >> > Hi Yangze, >> >> > >> >> > Does this tool just parse the configuration in flink-conf.yaml ? Maybe >> >> > it could be done in JobListener [1] (we should enhance it via adding >> >> > hook before job submission), so that it could all the cases (e.g. >> >> > parameters coming from command line) >> >> > >> >> > [1] >> >> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35 >> >> > >> >> > >> >> > Yangze Guo 于2020年3月30日周一 上午9:40写道: >> >> >> >> >> >> Hi, Yun, >> >> >> >> >> >> I'm sorry that it currently could not handle it. But I think it is a >> >> >> really good idea and that feature would be added to the next version. >> >> >> >> >> >> Best, >> >> >> Yangze Guo >> >> >> >> >> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang wrote: >> >> >> > >> >> >> > Very interesting and convenient tool, just a quick question: could >> >> >> > this tool also handle deployment cluster commands like "-tm" mixed >> >> >> > with configuration in `flink-conf.yaml` ? >> >> >> > >> >> >> > Best >> >> >> > Yun Tang >> >> >> > >> >> >> > From: Yangze Guo >> >> >> > Sent: Friday, March 27, 2020 18:00 >> >> >> > To: user ; user-zh@flink.apache.org >> >> >> > >> >> >> > Subject: [Third-party Tool] Flink memory calculator >> >> >> > >> >> >> > Hi, there. >> >> >> > >> >> >> > In release-1.10, the memory setup of task managers has changed a lot. >> >> >> > I would like to provide here a third-party tool to simulate and get >> >> >> > the calculation result of Flink's memory configuration. >> >> >> > >> >> >> > Although there is already a detailed setup guide[1] and migration >> >> >> > guide[2] officially, the calculator could further allow users to: >> >> >> > - Verify if there is any conflict in their configuration. The >> >> >> > calculator is more lightweight than starting a Flink cluster, >> >> >> > especially when running Flink on Yarn/Kubernetes. User could make >> >> >> > sure >> >> >> > their configuration is correct locally before deploying it to >> >> >> > external >> >> >> > resource managers. >> >> >> > - Get all of the memory configurations before deploying. User may set >> >> >> > taskmanager.memory.task.heap.size and >> >> >> > taskmanager.memory.managed.size. >> >> >> > But they also want to know the total memory consumption of Flink. >> >> >> > With >> >> >> > this tool, users could get all of the memory configurations they are >> >> >> > interested in. If anything is unexpected, they would not need to >> >> >> > re-deploy a Flink cluster. >> >> >> > >> >> >> > The repo link of this tool is >> >> >> >
Re: Flink SQL GROUP BY后写入postgresql数据库主键问题
Hi Longfei, 非常抱歉当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。 Best, Jark On Wed, 1 Apr 2020 at 19:12, Longfei Zhou wrote: > 问题: > SQL中对时间窗口和PRODUCT_ID进行了Group > By聚合操作,PG数据表中的主键须设置为WINDOW_START/WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID > +WINDOW_START/WINDOW_END为主键标识,请问Flink 中该如何实现这个需求? > > > > 场景:Top3 热门商品 > > > 数据样例: > ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME > 1,34,6005,4,2019-09-01 00:10:00 > 2,34,6003,1,2019-09-01 00:20:00 > 3,34,6005,4,2019-09-01 00:30:00 > 4,34,6006,3,2019-09-01 00:40:00 > 5,34,6001,6,2019-09-01 00:51:00 > 6,34,6005,1,2019-09-01 01:11:00 > > > > SQL逻辑如下: > --source > CREATE TABLE ORDER_DATA{ > ORDER_ID VARCHAR, > USER_ID VARCHAR, > PRODUCT_ID VARCHAR, > NUM BIGINT, > ORDER_TIME TIMESTAMP, > WATEERMARK FOR ORDER_TIME AS ORDER_TIME > }WITH{ > 'connector.type'='kafka', > 'connector.version'='0.10', > 'connector.topic'='orderData', > 'connector.start-mode'='latest-offset', > 'connector.properties.zookeeper.connect'=':2181', > 'connector.properties.boostrap.servers'=':9092', > 'connector.properties.group.id'='flink_sql', > 'format.type'='csv', > 'format.derive-schema'='true' > }; > > > --sink > CREATE TABLE PRODUCT_RANK{ > RANK_ID BIGINT, > WINDOW_START TIMESTAMP(3), > WINDOW_END TIMESTAMP(3), > PRODUCT_ID VARCHAR, > TOTAL_NUM BIGINT > }WITH{ > 'connector.type'='jdbc', > > 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8', > 'connector.driver'='org.postgresql.Driver', > 'connector.table'='product_rank', > 'connector.username'='x', > 'connector.password'='', > 'connector.write.flush.max-rows'='1' > }; > > > INSERT INTO PRODUCT_RANK > SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM > FROM( > SELECT *, > ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY > TOTAL_NUM DESC) AS RANK_ID > FROM( > SELECT > TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) > AS WINDOW_START, > TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) > AS WINDOW_END, > SUM(NUM) AS TOTAL_NUM. > PRODUCT_ID > FROM ORDER_DATA > GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' > hour),PRODUCT_ID > ) > ) WHERE RANK_ID <=3; > >
Re: flink 1.10 createTemporaryTable丢失proctime问题
Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 wrote: > > 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 > 如果我想使用createTemporaryTable该怎么办。 > 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
Flink SQL GROUP BY??????postgresql??????????????
?? SQL??PRODUCT_ID??Group By??PG??WINDOW_START/WINDOW_END??PRODUCT_IDupinsert??RANK_ID +WINDOW_START/WINDOW_ENDFlink ?? ??Top3 ?? ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME 1,34,6005,4,2019-09-01 00:10:00 2,34,6003,1,2019-09-01 00:20:00 3,34,6005,4,2019-09-01 00:30:00 4,34,6006,3,2019-09-01 00:40:00 5,34,6001,6,2019-09-01 00:51:00 6,34,6005,1,2019-09-01 01:11:00 SQL?? --source CREATE TABLE ORDER_DATA{ ORDER_ID VARCHAR, USER_ID VARCHAR, PRODUCT_ID VARCHAR, NUM BIGINT, ORDER_TIME TIMESTAMP, WATEERMARK FOR ORDER_TIME AS ORDER_TIME }WITH{ 'connector.type'='kafka', 'connector.version'='0.10', 'connector.topic'='orderData', 'connector.start-mode'='latest-offset', 'connector.properties.zookeeper.connect'=':2181', 'connector.properties.boostrap.servers'=':9092', 'connector.properties.group.id'='flink_sql', 'format.type'='csv', 'format.derive-schema'='true' }; --sink CREATE TABLE PRODUCT_RANK{ RANK_ID BIGINT, WINDOW_START TIMESTAMP(3), WINDOW_END TIMESTAMP(3), PRODUCT_ID VARCHAR, TOTAL_NUM BIGINT }WITH{ 'connector.type'='jdbc', 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8', 'connector.driver'='org.postgresql.Driver', 'connector.table'='product_rank', 'connector.username'='x', 'connector.password'='', 'connector.write.flush.max-rows'='1' }; INSERT INTO PRODUCT_RANK SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM FROM( SELECT *, ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM DESC) AS RANK_ID FROM( SELECT TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_START, TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_END, SUM(NUM) AS TOTAL_NUM. PRODUCT_ID FROM ORDER_DATA GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID ) ) WHERE RANK_ID <=3;
flink 1.10 createTemporaryTable丢失proctime问题
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 如果我想使用createTemporaryTable该怎么办。 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。
关于窗口函数不闭合的问题
Hi,大家好: 在做窗口统计的时候,用count over和sum over出现当前数据量无法统计,只能到下次才能统计到。 是参数写错了,还是另有其他函数,数据过来应该类似闭区间,现在是开区间的。请大家给个建议,谢谢啦?
Re:Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator
Hi, 从贤, 我查看了下HDFS, /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。 在 2020-04-01 16:50:13,"Congxian Qiu" 写道: >Hi >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 >从 TM 日志看像下载出错了,你可以看下 >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > >Best, >Congxian > > >chenxyz 于2020年4月1日周三 下午3:02写道: > >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for >> KeyedProcessOperator。这个问题怎么解决呢? >> >> 版本:1.10 standalone >> >> 配置信息: >> >> state.backend: rocksdb >> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint >> >> state.backend.incremental: true >> >> jobmanager.execution.failover-strategy: region >> >> io.tmp.dirs: /data/flink1_10/tmp >> >> >> >> >> 任务的checkpoint配置: >> >> env.enableCheckpointing(2 * 60 * 1000); >> >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); >> >> env.getCheckpointConfig().setCheckpointTimeout(6); >> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> >> >> 日志信息: >> >> >> >> >> 2020-04-01 11:13:03 >> >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the >> 1 provided restore options. >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) >> >> ... 9 more >> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected exception. >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) >> >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) >> >> ... 11 more >> >> Caused by: java.nio.file.NoSuchFileException: >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst >> -> >> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst >> >> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) >> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >> >> at >>
Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator
Hi Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复 从 TM 日志看像下载出错了,你可以看下 /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst 这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 Best, Congxian chenxyz 于2020年4月1日周三 下午3:02写道: > 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for > KeyedProcessOperator。这个问题怎么解决呢? > > 版本:1.10 standalone > > 配置信息: > > state.backend: rocksdb > > state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint > > state.backend.incremental: true > > jobmanager.execution.failover-strategy: region > > io.tmp.dirs: /data/flink1_10/tmp > > > > > 任务的checkpoint配置: > > env.enableCheckpointing(2 * 60 * 1000); > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); > > env.getCheckpointConfig().setCheckpointTimeout(6); > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > 日志信息: > > > > > 2020-04-01 11:13:03 > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the > 1 provided restore options. > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) > > ... 9 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > ... 11 more > > Caused by: java.nio.file.NoSuchFileException: > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > -> > /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst > > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > > at > sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) > > at java.nio.file.Files.createLink(Files.java:1086) > > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) > > at >
回复: Flink实时写入hive异常
Hi jingsong, 那厉害了,相当于Flink内部做了一个数据湖的插件了。 Best, Xinghalo
Re: Flink实时写入hive异常
Hi 111, 虽然数据湖可以扩展一些事情,但是流写Hive也是Hive数仓重要的一环。 文件数的问题: - 取决于checkpoint间隔,如果checkpoint间隔内,能写到128MB的文件,对HDFS来说就是很合适的文件大小了。 - 流写,也可以引入files compact等功能,FLIP-115里面也有讨论。 Best, Jingsong Lee On Wed, Apr 1, 2020 at 4:06 PM 111 wrote: > > > Hi, > 流写入hive,其实是属于数据湖的概念范畴。 > 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。 > 详细的可以了解 Delta lake 或 hudi。 > > > 在2020年04月1日 15:05,sunfulin 写道: > Hi, > 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。 > 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。 > OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么? > > > > > > > > > > > > > > > > > 在 2020-04-01 15:01:32,"Jingsong Li" 写道: > > Hi, > > > Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。 > > > 你可以描述下详细堆栈、应用场景、SQL吗? > > > Best, > Jingsong Lee > > > On Wed, Apr 1, 2020 at 2:56 PM sunfulin wrote: > > > > > > 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh > > > > org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not > enough rules to produce a node with desired properties > > > > > > > > > > > > > > > 在 2020-04-01 14:49:41,"Jingsong Li" 写道: > Hi, > > 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > Best, > Jingsong Lee > > On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > > Hi, > 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into > xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 > cc @Jingsong Li @Jark Wu > > > > > org.apache.flink.table.api.TableException: Stream Tables can only be > emitted by AppendStreamTableSink, RetractStreamTableSink, or > UpsertStreamTableSink. > > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) > > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > at scala.collection.Iterator.foreach(Iterator.scala:937) > > at scala.collection.Iterator.foreach$(Iterator.scala:937) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > > com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) > > at > > com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > > -- > Best, Jingsong Lee > > > > > > > > > > > > -- > > Best, Jingsong Lee -- Best, Jingsong Lee
回复: Flink实时写入hive异常
Hi, 流写入hive,其实是属于数据湖的概念范畴。 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。 详细的可以了解 Delta lake 或 hudi。 在2020年04月1日 15:05,sunfulin 写道: Hi, 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。 OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么? 在 2020-04-01 15:01:32,"Jingsong Li" 写道: Hi, Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。 你可以描述下详细堆栈、应用场景、SQL吗? Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:56 PM sunfulin wrote: 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties 在 2020-04-01 14:49:41,"Jingsong Li" 写道: Hi, 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: Hi, 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 cc @Jingsong Li @Jark Wu org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) at com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j -- Best, Jingsong Lee -- Best, Jingsong Lee
Re: Re: Re: Flink实时写入hive异常
不幸的是,FlinkSQL的确一直不支持。。 是的,这是1.11的重要目标之一。 Best, Jingsong Lee On Wed, Apr 1, 2020 at 3:05 PM sunfulin wrote: > Hi, > 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。 > 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。 > OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么? > > > > > > > 在 2020-04-01 15:01:32,"Jingsong Li" 写道: > > Hi, > > Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。 > > 你可以描述下详细堆栈、应用场景、SQL吗? > > Best, > Jingsong Lee > > On Wed, Apr 1, 2020 at 2:56 PM sunfulin wrote: > >> >> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh >> >> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not >> enough rules to produce a node with desired properties >> >> >> >> >> >> >> 在 2020-04-01 14:49:41,"Jingsong Li" 写道: >> >Hi, >> > >> >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] >> > >> >[1] >> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >> > >> >Best, >> >Jingsong Lee >> > >> >On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: >> > >> >> Hi, >> >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into >> >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 >> >> cc @Jingsong Li @Jark Wu >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.TableException: Stream Tables can only be >> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> >> UpsertStreamTableSink. >> >> >> >> at >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) >> >> >> >> at >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> >> >> >> at >> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> >> >> >> at >> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> >> >> >> at >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> >> >> >> at >> >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> >> >> >> at >> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> >> >> >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> >> >> >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> >> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> >> >> >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> >> >> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> >> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> >> >> >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> >> >> >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> >> >> at >> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> >> >> >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> >> >> >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> >> >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> >> >> at >> >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) >> >> >> >> at >> >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j >> > >> > >> > >> >-- >> >Best, Jingsong Lee >> >> >> >> >> > > > -- > Best, Jingsong Lee > > > > > -- Best, Jingsong Lee
Re:Re: Re: Flink实时写入hive异常
Hi, 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。 OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么? 在 2020-04-01 15:01:32,"Jingsong Li" 写道: Hi, Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。 你可以描述下详细堆栈、应用场景、SQL吗? Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:56 PM sunfulin wrote: 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties 在 2020-04-01 14:49:41,"Jingsong Li" 写道: >Hi, > >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >Best, >Jingsong Lee > >On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > >> Hi, >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 >> cc @Jingsong Li @Jark Wu >> >> >> >> >> org.apache.flink.table.api.TableException: Stream Tables can only be >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> UpsertStreamTableSink. >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> at >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) >> >> at >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > >-- >Best, Jingsong Lee -- Best, Jingsong Lee
rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator
任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for KeyedProcessOperator。这个问题怎么解决呢? 版本:1.10 standalone 配置信息: state.backend: rocksdb state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint state.backend.incremental: true jobmanager.execution.failover-strategy: region io.tmp.dirs: /data/flink1_10/tmp 任务的checkpoint配置: env.enableCheckpointing(2 * 60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 日志信息: 2020-04-01 11:13:03 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst -> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) at java.nio.file.Files.createLink(Files.java:1086) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154) at
Re: Re: Flink实时写入hive异常
Hi, Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。 你可以描述下详细堆栈、应用场景、SQL吗? Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:56 PM sunfulin wrote: > > 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh > > org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not > enough rules to produce a node with desired properties > > > > > > > 在 2020-04-01 14:49:41,"Jingsong Li" 写道: > >Hi, > > > >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > > > >[1] > >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >Best, > >Jingsong Lee > > > >On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > > > >> Hi, > >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into > >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 > >> cc @Jingsong Li @Jark Wu > >> > >> > >> > >> > >> org.apache.flink.table.api.TableException: Stream Tables can only be > >> emitted by AppendStreamTableSink, RetractStreamTableSink, or > >> UpsertStreamTableSink. > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > >> > >> at > >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > >> > >> at > >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > >> > >> at scala.collection.Iterator.foreach(Iterator.scala:937) > >> > >> at scala.collection.Iterator.foreach$(Iterator.scala:937) > >> > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > >> > >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) > >> > >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > >> > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> > >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) > >> > >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > >> > >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> > >> at > >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > >> > >> at > >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > >> > >> at > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> > >> at > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> > >> at > >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) > >> > >> at > >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > > > > > >-- > >Best, Jingsong Lee > > > > > -- Best, Jingsong Lee
Re:Re: Flink实时写入hive异常
我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties 在 2020-04-01 14:49:41,"Jingsong Li" 写道: >Hi, > >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >Best, >Jingsong Lee > >On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > >> Hi, >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 >> cc @Jingsong Li @Jark Wu >> >> >> >> >> org.apache.flink.table.api.TableException: Stream Tables can only be >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> UpsertStreamTableSink. >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> at >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) >> >> at >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > >-- >Best, Jingsong Lee
Re: Flink实时写入hive异常
Hi, 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > Hi, > 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into > xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 > cc @Jingsong Li @Jark Wu > > > > > org.apache.flink.table.api.TableException: Stream Tables can only be > emitted by AppendStreamTableSink, RetractStreamTableSink, or > UpsertStreamTableSink. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > at scala.collection.Iterator.foreach(Iterator.scala:937) > > at scala.collection.Iterator.foreach$(Iterator.scala:937) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) > > at > com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j -- Best, Jingsong Lee
Flink实时写入hive异常
Hi, 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 cc @Jingsong Li @Jark Wu org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) at com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j