Re: 关于窗口函数不闭合的问题

2020-04-01 文章 Jark Wu
Hi Fei,

可以带上你的 SQL 代码吗?你用的版本和planner 是哪个?
以及具体描述下”当前数据量无法统计,只能到下次才能统计到“,这个现象?不是很理解。

Best,
Jark

On Wed, 1 Apr 2020 at 18:00, Fei Han 
wrote:

> Hi,大家好:
> 在做窗口统计的时候,用count over和sum over出现当前数据量无法统计,只能到下次才能统计到。
> 是参数写错了,还是另有其他函数,数据过来应该类似闭区间,现在是开区间的。请大家给个建议,谢谢啦?


Re: Flink 1.10.0 HiveModule 函数问题

2020-04-01 文章 Jingsong Li
Hi,

GenericUDTFExplode是一个UDTF。
Flink中使用UDTF的方式是标准SQL的方式:
"select x from db1.nested, lateral table(explode(a)) as T(x)"

你试下。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#table-functions

Best,
Jingsong Lee

On Thu, Apr 2, 2020 at 11:22 AM Yaoting Gong 
wrote:

> 大家后,
>
>我们项目目前 集成了HiveModule,遇到一些问题。请教下大家。
>
> 在集成  Hive Module 之前,substr,split 都是无法使用的。集成后,验证都是可以的。
> 比如:select split('1,2,2,4',',')
>
> 但是在使用 explode 函数,select explode(split('1,2,2,4',','));
> 有如下错误:
>
>
> The main method caused an error: SQL validation failed. From line 1, column
> 8 to line 1, column 36: *No match found for function signature explode(*
> *)  *
>
> *完整堆栈:*
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: SQL validation failed. From line 1, column 8 to line 1,
> column 36: No match found for function signature explode()
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
> org.apache.flink.client.cli.CliFrontend$$Lambda$38/1205406622.call(Unknown
> Source)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/293907205.run(Unknown
> Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation
> failed. From line 1, column 8 to line 1, column 36: No match found for
> function signature explode()
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
> at
>
> com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
> at com.sui.bigdata.PlatformEngine$$$Lambda$765/623545006.apply(Unknown
> Source)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
> at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
> column 8 to line 1, column 36: No match found for function signature
> explode()
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1773)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:293)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
>
> 

Flink 1.10.0 HiveModule 函数问题

2020-04-01 文章 Yaoting Gong
大家后,

   我们项目目前 集成了HiveModule,遇到一些问题。请教下大家。

在集成  Hive Module 之前,substr,split 都是无法使用的。集成后,验证都是可以的。
比如:select split('1,2,2,4',',')

但是在使用 explode 函数,select explode(split('1,2,2,4',','));
有如下错误:


The main method caused an error: SQL validation failed. From line 1, column
8 to line 1, column 36: *No match found for function signature explode(*
*)  *

*完整堆栈:*
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 1, column 8 to line 1,
column 36: No match found for function signature explode()
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$38/1205406622.call(Unknown
Source)
at
org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/293907205.run(Unknown
Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 1, column 8 to line 1, column 36: No match found for
function signature explode()
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
at
com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine$$$Lambda$765/623545006.apply(Unknown
Source)
at scala.collection.immutable.List.foreach(List.scala:388)
at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 13 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 8 to line 1, column 36: No match found for function signature
explode()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1773)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:293)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479)
at

回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 111
Hi,
了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
多谢多谢


Best,
Xinghalo
在2020年04月2日 10:52,Benchao Li 写道:
你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:

SELECT
o.amout, o.currency, r.rate, o.amount * r.rateFROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency

此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
source。真正的维表的代码是在JDBCLookupFunction里面的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

111  于2020年4月2日周四 上午10:33写道:

Hi,
试验了下貌似不行,我的sql:


select s.*, item.product_name
from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
'10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
page like '10.pd.item-%’ ) s
inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
string) = s.item_id
where s.item_id is not null


看了下代码JDBCTableSource中的实现
String query = dialect.getSelectFromStatement( options.getTableName(),
returnType.getFieldNames(), new String[0]);
构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
在2020年04月2日 10:11,Benchao Li 写道:
Hi,

能否把你的SQL也发出来呢?

正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111  于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)




目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 Benchao Li
你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rateFROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
source。真正的维表的代码是在JDBCLookupFunction里面的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

111  于2020年4月2日周四 上午10:33写道:

> Hi,
> 试验了下貌似不行,我的sql:
>
>
> select s.*, item.product_name
> from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
> '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
> page like '10.pd.item-%’ ) s
> inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
> string) = s.item_id
> where s.item_id is not null
>
>
> 看了下代码JDBCTableSource中的实现
> String query = dialect.getSelectFromStatement( options.getTableName(),
> returnType.getFieldNames(), new String[0]);
> 构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
> 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
> 在2020年04月2日 10:11,Benchao Li 写道:
> Hi,
>
> 能否把你的SQL也发出来呢?
>
> 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。
>
> 111  于2020年4月2日周四 上午9:55写道:
>
> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink实时写入hive异常

2020-04-01 文章 Jingsong Li
是的,有关的,这个umbrella issue就是FLIP-115.

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 10:57 PM 叶贤勋  wrote:

> Hi jingsong,
> 我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关?
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14255
>
>
> | |
> 叶贤勋
> |
> |
> yxx_c...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年04月1日 16:28,111 写道:
> Hi jingsong,
> 那厉害了,相当于Flink内部做了一个数据湖的插件了。
> Best,
> Xinghalo



-- 
Best, Jingsong Lee


回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 111
Hi,
试验了下貌似不行,我的sql:


select s.*, item.product_name
from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page, '10.pd.item-', 1) 
as string) as item_id,  `time` from tgs_topic_t1  where page like 
'10.pd.item-%’ ) s
inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as string) = 
s.item_id
where s.item_id is not null


看了下代码JDBCTableSource中的实现
String query = dialect.getSelectFromStatement( options.getTableName(), 
returnType.getFieldNames(), new String[0]);
构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
在2020年04月2日 10:11,Benchao Li 写道:
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111  于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)



目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 deadwind4
registerTableSource 被标记了@Deprecated 在flink 
1.10,我这种情况是继续沿用过期的API(registerTableSource)吗?


 原始邮件 
发件人: deadwind4
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:30
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


修改前  
tEnv.connect().withFormat().withSchema(
xxx.proctime()
).registerTableSource(“foo”);


修改后
tEnv.connect().withFormat().withSchema(
xxx.proctime()
).createTemporaryTable(“foo”);


修改后.proctime()就失效了,所以我proctime window也用不了了。


 原始邮件 
发件人: deadwind4
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:22
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);


 原始邮件 
发件人: Jark Wu
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:18
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 
方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 
的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4  写道: > 
> 我其实是想用processing time window 但是我把过期的API 
registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 
> > > 原始邮件 > 发件人: Jark Wu > 收件人: 
user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 
1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 
current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 
createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 
18:56, deadwind4  wrote: > > 
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 
> 如果我想使用createTemporaryTable该怎么办。 > 
并且我debug了createTemporaryTable的源码没有发现对proctime的处理。

Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 deadwind4
修改前  
tEnv.connect().withFormat().withSchema(
xxx.proctime()
).registerTableSource(“foo”);


修改后
tEnv.connect().withFormat().withSchema(
xxx.proctime()
).createTemporaryTable(“foo”);


修改后.proctime()就失效了,所以我proctime window也用不了了。


 原始邮件 
发件人: deadwind4
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:22
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);


 原始邮件 
发件人: Jark Wu
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:18
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 
方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 
的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4  写道: > 
> 我其实是想用processing time window 但是我把过期的API 
registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 
> > > 原始邮件 > 发件人: Jark Wu > 收件人: 
user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 
1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 
current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 
createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 
18:56, deadwind4  wrote: > > 
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 
> 如果我想使用createTemporaryTable该怎么办。 > 
并且我debug了createTemporaryTable的源码没有发现对proctime的处理。

Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 deadwind4
tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);


 原始邮件 
发件人: Jark Wu
收件人: user-zh
发送时间: 2020年4月2日(周四) 10:18
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 
方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView 
的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4  写道: > 
> 我其实是想用processing time window 但是我把过期的API 
registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 
> > > 原始邮件 > 发件人: Jark Wu > 收件人: 
user-zh > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink 
1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 now()或者 
current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 System.currentTimeMillis)。 能请描述下你想用 
createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 
18:56, deadwind4  wrote: > > 
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 
> 如果我想使用createTemporaryTable该怎么办。 > 
并且我debug了createTemporaryTable的源码没有发现对proctime的处理。

回复: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 111


Hi benchao,
原来如此,我这边只是做了普通查询,并没有走join。
我加上join条件再试下哈
Best,
Xinghalo
在2020年04月2日 10:11,Benchao Li 写道:
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111  于2020年4月2日周四 上午9:55写道:

Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)



目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 Jark Wu
Hi,

你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable 
方法,只有createTemporaryView方法,而且  registerTableSource 和 createTemporaryView 
的参数是不一样的。

Best,
Jark

> 2020年4月1日 23:13,deadwind4  写道:
> 
> 我其实是想用processing time window 但是我把过期的API 
> registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。
> 
> 
> 原始邮件 
> 发件人: Jark Wu
> 收件人: user-zh
> 发送时间: 2020年4月1日(周三) 21:37
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
> 
> 
> Hi, proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 
> System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢? 
> Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4  
> wrote: > > 
> 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
>  > 如果我想使用createTemporaryTable该怎么办。 > 
> 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。



Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 Benchao Li
Hi,

能否把你的SQL也发出来呢?
正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。

111  于2020年4月2日周四 上午9:55写道:

> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-01 文章 111
Hi,
想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
1 获取查询sql中的字段和表名,拼接成select a, b, c from t
2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
3 执行sql,加载到内存(不确定后面缓存的实现细节)


目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?


之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
不知道有没有什么优雅的解决方案?


Best,
Xinghalo

回复: Flink实时写入hive异常

2020-04-01 文章 叶贤勋
Hi jingsong,
我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关?


[1] https://issues.apache.org/jira/browse/FLINK-14255


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年04月1日 16:28,111 写道:
Hi jingsong,
那厉害了,相当于Flink内部做了一个数据湖的插件了。
Best,
Xinghalo

Re: [Third-party Tool] Flink memory calculator

2020-04-01 文章 Yangze Guo
@Marta
Thanks for the tip! I'll do that.

Best,
Yangze Guo

On Wed, Apr 1, 2020 at 8:05 PM Marta Paes Moreira  wrote:
>
> Hey, Yangze.
>
> I'd like to suggest that you submit this tool to Flink Community Pages [1]. 
> That way it can get more exposure and it'll be easier for users to find it.
>
> Thanks for your contribution!
>
> [1] https://flink-packages.org/
>
> On Tue, Mar 31, 2020 at 9:09 AM Yangze Guo  wrote:
>>
>> Hi, there.
>>
>> In the latest version, the calculator supports dynamic options. You
>> could append all your dynamic options to the end of "bin/calculator.sh
>> [-h]".
>> Since "-tm" will be deprecated eventually, please replace it with
>> "-Dtaskmanager.memory.process.size=".
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Mar 30, 2020 at 12:57 PM Xintong Song  wrote:
>> >
>> > Hi Jeff,
>> >
>> > I think the purpose of this tool it to allow users play with the memory 
>> > configurations without needing to actually deploy the Flink cluster or 
>> > even have a job. For sanity checks, we currently have them in the start-up 
>> > scripts (for standalone clusters) and resource managers (on 
>> > K8s/Yarn/Mesos).
>> >
>> > I think it makes sense do the checks earlier, i.e. on the client side. But 
>> > I'm not sure if JobListener is the right place. IIUC, JobListener is 
>> > invoked before submitting a specific job, while the mentioned checks 
>> > validate Flink's cluster level configurations. It might be okay for a job 
>> > cluster, but does not cover the scenarios of session clusters.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> > On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo  wrote:
>> >>
>> >> Thanks for your feedbacks, @Xintong and @Jeff.
>> >>
>> >> @Jeff
>> >> I think it would always be good to leverage exist logic in Flink, such
>> >> as JobListener. However, this calculator does not only target to check
>> >> the conflict, it also targets to provide the calculating result to
>> >> user before the job is actually deployed in case there is any
>> >> unexpected configuration. It's a good point that we need to parse the
>> >> dynamic configs. I prefer to parse the dynamic configs and cli
>> >> commands in bash instead of adding hook in JobListener.
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang  wrote:
>> >> >
>> >> > Hi Yangze,
>> >> >
>> >> > Does this tool just parse the configuration in flink-conf.yaml ?  Maybe 
>> >> > it could be done in JobListener [1] (we should enhance it via adding 
>> >> > hook before job submission), so that it could all the cases (e.g. 
>> >> > parameters coming from command line)
>> >> >
>> >> > [1] 
>> >> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35
>> >> >
>> >> >
>> >> > Yangze Guo  于2020年3月30日周一 上午9:40写道:
>> >> >>
>> >> >> Hi, Yun,
>> >> >>
>> >> >> I'm sorry that it currently could not handle it. But I think it is a
>> >> >> really good idea and that feature would be added to the next version.
>> >> >>
>> >> >> Best,
>> >> >> Yangze Guo
>> >> >>
>> >> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
>> >> >> >
>> >> >> > Very interesting and convenient tool, just a quick question: could 
>> >> >> > this tool also handle deployment cluster commands like "-tm" mixed 
>> >> >> > with configuration in `flink-conf.yaml` ?
>> >> >> >
>> >> >> > Best
>> >> >> > Yun Tang
>> >> >> > 
>> >> >> > From: Yangze Guo 
>> >> >> > Sent: Friday, March 27, 2020 18:00
>> >> >> > To: user ; user-zh@flink.apache.org 
>> >> >> > 
>> >> >> > Subject: [Third-party Tool] Flink memory calculator
>> >> >> >
>> >> >> > Hi, there.
>> >> >> >
>> >> >> > In release-1.10, the memory setup of task managers has changed a lot.
>> >> >> > I would like to provide here a third-party tool to simulate and get
>> >> >> > the calculation result of Flink's memory configuration.
>> >> >> >
>> >> >> >  Although there is already a detailed setup guide[1] and migration
>> >> >> > guide[2] officially, the calculator could further allow users to:
>> >> >> > - Verify if there is any conflict in their configuration. The
>> >> >> > calculator is more lightweight than starting a Flink cluster,
>> >> >> > especially when running Flink on Yarn/Kubernetes. User could make 
>> >> >> > sure
>> >> >> > their configuration is correct locally before deploying it to 
>> >> >> > external
>> >> >> > resource managers.
>> >> >> > - Get all of the memory configurations before deploying. User may set
>> >> >> > taskmanager.memory.task.heap.size and 
>> >> >> > taskmanager.memory.managed.size.
>> >> >> > But they also want to know the total memory consumption of Flink. 
>> >> >> > With
>> >> >> > this tool, users could get all of the memory configurations they are
>> >> >> > interested in. If anything is unexpected, they would not need to
>> >> >> > re-deploy a Flink cluster.
>> >> >> >
>> >> >> > The repo link of this tool is
>> >> >> > 

Re: Flink SQL GROUP BY后写入postgresql数据库主键问题

2020-04-01 文章 Jark Wu
Hi Longfei,

非常抱歉当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。

Best,
Jark

On Wed, 1 Apr 2020 at 19:12, Longfei Zhou  wrote:

> 问题:
> SQL中对时间窗口和PRODUCT_ID进行了Group
> By聚合操作,PG数据表中的主键须设置为WINDOW_START/WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID
> +WINDOW_START/WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?
>
>
>
> 场景:Top3 热门商品
>
>
> 数据样例:
> ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
> 1,34,6005,4,2019-09-01 00:10:00
> 2,34,6003,1,2019-09-01 00:20:00
> 3,34,6005,4,2019-09-01 00:30:00
> 4,34,6006,3,2019-09-01 00:40:00
> 5,34,6001,6,2019-09-01 00:51:00
> 6,34,6005,1,2019-09-01 01:11:00
>
>
>
> SQL逻辑如下:
> --source
> CREATE TABLE ORDER_DATA{
> ORDER_ID VARCHAR,
> USER_ID VARCHAR,
> PRODUCT_ID VARCHAR,
> NUM BIGINT,
> ORDER_TIME TIMESTAMP,
> WATEERMARK FOR ORDER_TIME AS ORDER_TIME
> }WITH{
> 'connector.type'='kafka',
> 'connector.version'='0.10',
> 'connector.topic'='orderData',
> 'connector.start-mode'='latest-offset',
> 'connector.properties.zookeeper.connect'=':2181',
> 'connector.properties.boostrap.servers'=':9092',
> 'connector.properties.group.id'='flink_sql',
> 'format.type'='csv',
> 'format.derive-schema'='true'
> };
>
>
> --sink
> CREATE TABLE PRODUCT_RANK{
> RANK_ID BIGINT,
> WINDOW_START TIMESTAMP(3),
> WINDOW_END TIMESTAMP(3),
> PRODUCT_ID VARCHAR,
> TOTAL_NUM BIGINT
> }WITH{
> 'connector.type'='jdbc',
>
> 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
> 'connector.driver'='org.postgresql.Driver',
> 'connector.table'='product_rank',
> 'connector.username'='x',
> 'connector.password'='',
> 'connector.write.flush.max-rows'='1'
> };
>
>
> INSERT INTO PRODUCT_RANK
>   SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
> FROM(
> SELECT *,
> ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY
> TOTAL_NUM DESC) AS RANK_ID
> FROM(
> SELECT
> TUMBLE_START(ORDER_TIME,INTERVAL '1' hour)
> AS WINDOW_START,
> TUMBLE_END(ORDER_TIME,INTERVAL '1' hour)
> AS WINDOW_END,
> SUM(NUM) AS TOTAL_NUM.
> PRODUCT_ID
> FROM ORDER_DATA
> GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1'
> hour),PRODUCT_ID
> )
> ) WHERE RANK_ID <=3;
>
>   


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 Jark Wu
Hi,

proctime 的含义是机器时间,不等价于 now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取
System.currentTimeMillis)。

能请描述下你想用 createTemporaryTable 做什么呢?当前哪里不能满足你的需求呢?

Best,
Jark

On Wed, 1 Apr 2020 at 18:56, deadwind4  wrote:

>
> 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
> 如果我想使用createTemporaryTable该怎么办。
> 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。


Flink SQL GROUP BY??????postgresql??????????????

2020-04-01 文章 Longfei Zhou
??
SQL??PRODUCT_ID??Group 
By??PG??WINDOW_START/WINDOW_END??PRODUCT_IDupinsert??RANK_ID
 +WINDOW_START/WINDOW_ENDFlink ??



??Top3 


??
ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
1,34,6005,4,2019-09-01 00:10:00
2,34,6003,1,2019-09-01 00:20:00
3,34,6005,4,2019-09-01 00:30:00
4,34,6006,3,2019-09-01 00:40:00
5,34,6001,6,2019-09-01 00:51:00
6,34,6005,1,2019-09-01 01:11:00



SQL??
--source
CREATE TABLE ORDER_DATA{
ORDER_ID VARCHAR,
USER_ID VARCHAR,
PRODUCT_ID VARCHAR,
NUM BIGINT,
ORDER_TIME TIMESTAMP,
WATEERMARK FOR ORDER_TIME AS ORDER_TIME
}WITH{
'connector.type'='kafka',
'connector.version'='0.10',
'connector.topic'='orderData',
'connector.start-mode'='latest-offset',
'connector.properties.zookeeper.connect'=':2181',
'connector.properties.boostrap.servers'=':9092',
'connector.properties.group.id'='flink_sql',
'format.type'='csv',
'format.derive-schema'='true'
};


--sink
CREATE TABLE PRODUCT_RANK{
RANK_ID BIGINT,
WINDOW_START TIMESTAMP(3),
WINDOW_END TIMESTAMP(3),
PRODUCT_ID VARCHAR,
TOTAL_NUM BIGINT
}WITH{
'connector.type'='jdbc',

'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
'connector.driver'='org.postgresql.Driver',
'connector.table'='product_rank',
'connector.username'='x',
'connector.password'='',
'connector.write.flush.max-rows'='1'
};


INSERT INTO PRODUCT_RANK
  SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
FROM(
SELECT *,
ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM 
DESC) AS RANK_ID
FROM(
SELECT
TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS 
WINDOW_START,
TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS 
WINDOW_END,
SUM(NUM) AS TOTAL_NUM.
PRODUCT_ID
FROM ORDER_DATA
GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID
)
) WHERE RANK_ID <=3;

  

flink 1.10 createTemporaryTable丢失proctime问题

2020-04-01 文章 deadwind4
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
如果我想使用createTemporaryTable该怎么办。
并且我debug了createTemporaryTable的源码没有发现对proctime的处理。

关于窗口函数不闭合的问题

2020-04-01 文章 Fei Han
Hi,大家好:
在做窗口统计的时候,用count over和sum over出现当前数据量无法统计,只能到下次才能统计到。
是参数写错了,还是另有其他函数,数据过来应该类似闭区间,现在是开区间的。请大家给个建议,谢谢啦?

Re:Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 chenxyz
Hi, 从贤,
我查看了下HDFS, 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。








在 2020-04-01 16:50:13,"Congxian Qiu"  写道:
>Hi
>Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>从 TM 日志看像下载出错了,你可以看下
>/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>
>Best,
>Congxian
>
>
>chenxyz  于2020年4月1日周三 下午3:02写道:
>
>> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> KeyedProcessOperator。这个问题怎么解决呢?
>>
>> 版本:1.10 standalone
>>
>> 配置信息:
>>
>> state.backend: rocksdb
>>
>> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>>
>> state.backend.incremental: true
>>
>> jobmanager.execution.failover-strategy: region
>>
>> io.tmp.dirs: /data/flink1_10/tmp
>>
>>
>>
>>
>> 任务的checkpoint配置:
>>
>> env.enableCheckpointing(2 * 60 * 1000);
>>
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>>
>> env.getCheckpointConfig().setCheckpointTimeout(6);
>>
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>> 日志信息:
>>
>>
>>
>>
>> 2020-04-01 11:13:03
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
>> 1 provided restore options.
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>>
>> ... 9 more
>>
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>
>> ... 11 more
>>
>> Caused by: java.nio.file.NoSuchFileException:
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> ->
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>>
>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>>
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>
>> at
>> 

Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 Congxian Qiu
Hi
Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
从 TM 日志看像下载出错了,你可以看下
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因

Best,
Congxian


chenxyz  于2020年4月1日周三 下午3:02写道:

> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> KeyedProcessOperator。这个问题怎么解决呢?
>
> 版本:1.10 standalone
>
> 配置信息:
>
> state.backend: rocksdb
>
> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>
> state.backend.incremental: true
>
> jobmanager.execution.failover-strategy: region
>
> io.tmp.dirs: /data/flink1_10/tmp
>
>
>
>
> 任务的checkpoint配置:
>
> env.enableCheckpointing(2 * 60 * 1000);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>
> env.getCheckpointConfig().setCheckpointTimeout(6);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
> 日志信息:
>
>
>
>
> 2020-04-01 11:13:03
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
> 1 provided restore options.
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
> ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 11 more
>
> Caused by: java.nio.file.NoSuchFileException:
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> ->
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>
> at
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>
> at java.nio.file.Files.createLink(Files.java:1086)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>
> at
> 

回复: Flink实时写入hive异常

2020-04-01 文章 111
Hi jingsong,
那厉害了,相当于Flink内部做了一个数据湖的插件了。
Best,
Xinghalo

Re: Flink实时写入hive异常

2020-04-01 文章 Jingsong Li
Hi 111,

虽然数据湖可以扩展一些事情,但是流写Hive也是Hive数仓重要的一环。

文件数的问题:
- 取决于checkpoint间隔,如果checkpoint间隔内,能写到128MB的文件,对HDFS来说就是很合适的文件大小了。
- 流写,也可以引入files compact等功能,FLIP-115里面也有讨论。

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 4:06 PM 111  wrote:

>
>
> Hi,
> 流写入hive,其实是属于数据湖的概念范畴。
> 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
> 详细的可以了解 Delta lake 或 hudi。
>
>
> 在2020年04月1日 15:05,sunfulin 写道:
> Hi,
> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-01 15:01:32,"Jingsong Li"  写道:
>
> Hi,
>
>
> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
>
> 你可以描述下详细堆栈、应用场景、SQL吗?
>
>
> Best,
> Jingsong Lee
>
>
> On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:
>
>
>
>
>
> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
>
>
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-01 14:49:41,"Jingsong Li"  写道:
> Hi,
>
> 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
> Hi,
> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> cc  @Jingsong Li  @Jark Wu
>
>
>
>
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> at scala.collection.Iterator.foreach(Iterator.scala:937)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
>
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
> at
>
> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
> --
> Best, Jingsong Lee
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best, Jingsong Lee



-- 
Best, Jingsong Lee


回复: Flink实时写入hive异常

2020-04-01 文章 111


Hi,
流写入hive,其实是属于数据湖的概念范畴。
因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
详细的可以了解 Delta lake 或 hudi。


在2020年04月1日 15:05,sunfulin 写道:
Hi,
场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
















在 2020-04-01 15:01:32,"Jingsong Li"  写道:

Hi,


Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。


你可以描述下详细堆栈、应用场景、SQL吗?


Best,
Jingsong Lee


On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:





我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
Hi,

异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:

Hi,
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu




org.apache.flink.table.api.TableException: Stream Tables can only be
emitted by AppendStreamTableSink, RetractStreamTableSink, or
UpsertStreamTableSink.

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)

at scala.collection.Iterator.foreach$(Iterator.scala:937)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

at scala.collection.IterableLike.foreach(IterableLike.scala:70)

at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike.map(TraversableLike.scala:233)

at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at
com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

at
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j



--
Best, Jingsong Lee











--

Best, Jingsong Lee

Re: Re: Re: Flink实时写入hive异常

2020-04-01 文章 Jingsong Li
不幸的是,FlinkSQL的确一直不支持。。
是的,这是1.11的重要目标之一。

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 3:05 PM sunfulin  wrote:

> Hi,
> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
> 在 2020-04-01 15:01:32,"Jingsong Li"  写道:
>
> Hi,
>
> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
> 你可以描述下详细堆栈、应用场景、SQL吗?
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:
>
>>
>> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>>
>> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
>> enough rules to produce a node with desired properties
>>
>>
>>
>>
>>
>>
>> 在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>> >Hi,
>> >
>> >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>> >
>> >[1]
>> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>> >
>> >Best,
>> >Jingsong Lee
>> >
>> >On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>> >
>> >> Hi,
>> >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> >> cc  @Jingsong Li  @Jark Wu
>> >>
>> >>
>> >>
>> >>
>> >> org.apache.flink.table.api.TableException: Stream Tables can only be
>> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> >> UpsertStreamTableSink.
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> >>
>> >>  at
>> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> >>
>> >>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>> >>
>> >>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> >>
>> >>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> >>
>> >>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> >>
>> >>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> >>
>> >>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>
>> >>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> >>
>> >>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> >>
>> >>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> >>
>> >>  at
>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >>
>> >>  at
>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >>
>> >>  at
>> >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>> >>
>> >>  at
>> >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>> >
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>>
>>
>>
>>
>
>
> --
> Best, Jingsong Lee
>
>
>
>
>


-- 
Best, Jingsong Lee


Re:Re: Re: Flink实时写入hive异常

2020-04-01 文章 sunfulin
Hi,
场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
















在 2020-04-01 15:01:32,"Jingsong Li"  写道:

Hi,


Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。


你可以描述下详细堆栈、应用场景、SQL吗?


Best,
Jingsong Lee


On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:





我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee





 





--

Best, Jingsong Lee

rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 chenxyz
任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for 
KeyedProcessOperator。这个问题怎么解决呢?

版本:1.10 standalone

配置信息:

state.backend: rocksdb

state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint

state.backend.incremental: true

jobmanager.execution.failover-strategy: region

io.tmp.dirs: /data/flink1_10/tmp




任务的checkpoint配置:

env.enableCheckpointing(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointTimeout(6);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




日志信息:




2020-04-01 11:13:03

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from 
any of the 1 provided restore options.

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more

Caused by: java.nio.file.NoSuchFileException: 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
 -> 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)

at java.nio.file.Files.createLink(Files.java:1086)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

at 

Re: Re: Flink实时写入hive异常

2020-04-01 文章 Jingsong Li
Hi,

Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。

你可以描述下详细堆栈、应用场景、SQL吗?

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:

>
> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties
>
>
>
>
>
>
> 在 2020-04-01 14:49:41,"Jingsong Li"  写道:
> >Hi,
> >
> >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
> >
> >[1]
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> >
> >Best,
> >Jingsong Lee
> >
> >On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
> >
> >> Hi,
> >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> >> cc  @Jingsong Li  @Jark Wu
> >>
> >>
> >>
> >>
> >> org.apache.flink.table.api.TableException: Stream Tables can only be
> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> >> UpsertStreamTableSink.
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> >>
> >>  at
> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> >>
> >>  at scala.collection.Iterator.foreach(Iterator.scala:937)
> >>
> >>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
> >>
> >>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> >>
> >>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> >>
> >>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> >>
> >>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> >>
> >>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> >>
> >>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >>
> >>  at
> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >>  at
> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >>  at
> >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
> >>
> >>  at
> >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
> >
> >
> >
> >--
> >Best, Jingsong Lee
>
>
>
>
>


-- 
Best, Jingsong Lee


Re:Re: Flink实时写入hive异常

2020-04-01 文章 sunfulin



我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee


Re: Flink实时写入hive异常

2020-04-01 文章 Jingsong Li
Hi,

异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:

> Hi,
> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> cc  @Jingsong Li  @Jark Wu
>
>
>
>
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>  at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>  at
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>  at
> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j



-- 
Best, Jingsong Lee


Flink实时写入hive异常

2020-04-01 文章 sunfulin
Hi, 
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into 
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu 




org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

 at scala.collection.Iterator.foreach(Iterator.scala:937)

 at scala.collection.Iterator.foreach$(Iterator.scala:937)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

 at scala.collection.IterableLike.foreach(IterableLike.scala:70)

 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at scala.collection.TraversableLike.map(TraversableLike.scala:233)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

 at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

 at 
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j