Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Jark Wu
在 Flink 1.11 中,你可以尝试这样:

CREATE TABLE mysql (
   time_str STRING,
   uv BIGINT,
   PRIMARY KEY (ts) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'myuv'
);

INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT  user_id)
FROM user_behavior;

On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote:

> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 1
> 2020/06/17 13:47:00 2
> 2020/06/17 13:48:00 3
>
>
> group by 日期的话,分钟如何获取
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年6月17日(星期三) 中午11:46
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com 于2020年6月17日周三 上午11:14写道:
>
>  需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  CREATE VIEW uv_per_10min AS
>  SELECTnbsp;
>  nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd
> HH:mm:00'))nbsp;OVER w
>  AS time_str,nbsp;
>  nbsp; COUNT(DISTINCT user_id) OVER w AS uv
>  FROM user_behavior
>  WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
>  CURRENT ROW);
> 
> 
>  想请教一下,应该如何处理?
>  PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
>  PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  多谢


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Jingsong Li
Congratulations Yu, well deserved!

Best,
Jingsong

On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:

> Congrats, Yu!
>
> GXGX & well deserved!!
>
> Best Regards,
>
> Yuan
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> wrote:
>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on
>> various improvements, for example the RocksDB memory management for 1.10.
>> and keeps checking and voting for our releases, and also has successfully
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)
>>
>

-- 
Best, Jingsong Lee


Re: Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Jark Wu
是的。

On Wed, 17 Jun 2020 at 13:50, Zhou Zach  wrote:

> 那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-06-17 13:45:15, "Jark Wu"  wrote:
> >Hi,
> >
> >HBase connector 不用声明 update-mode 属性。 也不能声明。
> >
> >Best,
> >Jark
> >
> >On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:
> >
> >> The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: Could not find a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSinkFactory' in
> >> the classpath.
> >>
> >>
> >> Reason: No factory supports all properties.
> >>
> >>
> >> The matching candidates:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> Unsupported property keys:
> >> update-mode
> >>
> >>
> >> The following properties are requested:
> >> connector.table-name=user_hbase10
> >> connector.type=hbase
> >> connector.version=2.1.0
> >> connector.write.buffer-flush.interval=2s
> >> connector.write.buffer-flush.max-rows=1000
> >> connector.write.buffer-flush.max-size=10mb
> >> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> >> connector.zookeeper.znode.parent=/hbase
> >> schema.0.data-type=VARCHAR(2147483647)
> >> schema.0.name=rowkey
> >> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> >> `created_time` TIMESTAMP(3)
> >> schema.1.name=cf
> >> update-mode=upsert
> >>
> >>
> >> The following factories have been considered:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >> at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >> Could not find a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSinkFactory' in
> >> the classpath.
> >>
> >>
> >> Reason: No factory supports all properties.
> >>
> >>
> >> The matching candidates:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> Unsupported property keys:
> >> update-mode
> >>
> >>
> >> The following properties are requested:
> >> connector.table-name=user_hbase10
> >> connector.type=hbase
> >> connector.version=2.1.0
> >> connector.write.buffer-flush.interval=2s
> >> connector.write.buffer-flush.max-rows=1000
> >> connector.write.buffer-flush.max-size=10mb
> >> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> >> connector.zookeeper.znode.parent=/hbase
> >> schema.0.data-type=VARCHAR(2147483647)
> >> schema.0.name=rowkey
> >> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> >> `created_time` TIMESTAMP(3)
> >> schema.1.name=cf
> >> update-mode=upsert
> >>
> >>
> >> The following factories have been considered:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
> >> at
> >>
> 

Re:Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Zhou Zach
那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧

















At 2020-06-17 13:45:15, "Jark Wu"  wrote:
>Hi,
>
>HBase connector 不用声明 update-mode 属性。 也不能声明。
>
>Best,
>Jark
>
>On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:
>
>> The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSinkFactory' in
>> the classpath.
>>
>>
>> Reason: No factory supports all properties.
>>
>>
>> The matching candidates:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> Unsupported property keys:
>> update-mode
>>
>>
>> The following properties are requested:
>> connector.table-name=user_hbase10
>> connector.type=hbase
>> connector.version=2.1.0
>> connector.write.buffer-flush.interval=2s
>> connector.write.buffer-flush.max-rows=1000
>> connector.write.buffer-flush.max-size=10mb
>> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
>> connector.zookeeper.znode.parent=/hbase
>> schema.0.data-type=VARCHAR(2147483647)
>> schema.0.name=rowkey
>> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
>> `created_time` TIMESTAMP(3)
>> schema.1.name=cf
>> update-mode=upsert
>>
>>
>> The following factories have been considered:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSinkFactory' in
>> the classpath.
>>
>>
>> Reason: No factory supports all properties.
>>
>>
>> The matching candidates:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> Unsupported property keys:
>> update-mode
>>
>>
>> The following properties are requested:
>> connector.table-name=user_hbase10
>> connector.type=hbase
>> connector.version=2.1.0
>> connector.write.buffer-flush.interval=2s
>> connector.write.buffer-flush.max-rows=1000
>> connector.write.buffer-flush.max-size=10mb
>> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
>> connector.zookeeper.znode.parent=/hbase
>> schema.0.data-type=VARCHAR(2147483647)
>> schema.0.name=rowkey
>> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
>> `created_time` TIMESTAMP(3)
>> schema.1.name=cf
>> update-mode=upsert
>>
>>
>> The following factories have been considered:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> at
>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>> at
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>> at
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>> at
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>> at
>> 

?????? ??????FLINKSQL1.10????????????UV

2020-06-16 文章 x
??"??"UV??
sink??
tm uv
2020/06/17 13:46:00 1
2020/06/17 13:47:00 2
2020/06/17 13:48:00 3


group by ??


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com ??2020??6??17?? 11:14??

 
??0??UV??UV??
 CREATE VIEW uv_per_10min AS
 SELECTnbsp;
 nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd 
HH:mm:00'))nbsp;OVER w
 AS time_str,nbsp;
 nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 FROM user_behavior
 WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
 CURRENT ROW);


 ??
 PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 PS??1.10??DDL??CREATE VIEW??
 

Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Jark Wu
Hi,

HBase connector 不用声明 update-mode 属性。 也不能声明。

Best,
Jark

On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> the classpath.
>
>
> Reason: No factory supports all properties.
>
>
> The matching candidates:
> org.apache.flink.addons.hbase.HBaseTableFactory
> Unsupported property keys:
> update-mode
>
>
> The following properties are requested:
> connector.table-name=user_hbase10
> connector.type=hbase
> connector.version=2.1.0
> connector.write.buffer-flush.interval=2s
> connector.write.buffer-flush.max-rows=1000
> connector.write.buffer-flush.max-size=10mb
> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> connector.zookeeper.znode.parent=/hbase
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=rowkey
> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> `created_time` TIMESTAMP(3)
> schema.1.name=cf
> update-mode=upsert
>
>
> The following factories have been considered:
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> the classpath.
>
>
> Reason: No factory supports all properties.
>
>
> The matching candidates:
> org.apache.flink.addons.hbase.HBaseTableFactory
> Unsupported property keys:
> update-mode
>
>
> The following properties are requested:
> connector.table-name=user_hbase10
> connector.type=hbase
> connector.version=2.1.0
> connector.write.buffer-flush.interval=2s
> connector.write.buffer-flush.max-rows=1000
> connector.write.buffer-flush.max-size=10mb
> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> connector.zookeeper.znode.parent=/hbase
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=rowkey
> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> `created_time` TIMESTAMP(3)
> schema.1.name=cf
> update-mode=upsert
>
>
> The following factories have been considered:
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Rui Li
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
> >
> >
> >
> >
> > 在2020年06月16日 18:38,Benchao Li 写道:
> > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
> >
> > Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
> >
> > > 我编译了1.11包
> > > 在sql-cli下查询hive的表报如下错误:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
> > >
> > >
> > > 查注册的kafka表报:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.ClassNotFoundException:
> > org.apache.flink.table.dataformat.BaseRow
> > >
> > >
> > > 依赖包是从1.10.1下面拷贝的
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > > Got it!
> > > Thx,junbao
> > >
> > >
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 09:32,zhangjunbao 写道:
> > > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > > https://issues.apache.org/jira/browse/FLINK-17189 <
> > > https://issues.apache.org/jira/browse/FLINK-17189>
> > >
> > > Best,
> > > Junbao Zhang
> > >
> > > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> > >
> > > hi,all
> > > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > > ddl如下:
> > > |
> > > CREATETABLE user_behavior (
> > > user_id BIGINT,
> > > item_id BIGINT,
> > > category_id BIGINT,
> > > behavior STRING,
> > > ts TIMESTAMP(3),
> > > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > > ) WITH (
> > > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > > 'connector.topic' = 'user_behavior',  -- kafka topic
> > > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> > zookeeper
> > > 地址
> > > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > > broker 地址
> > > 'format.type' = 'json'-- 数据源格式为 json
> > > );
> > > |
> > > 在查询时select * from user_behavior;报错如下:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.AssertionError: Conversion to relational algebra failed to
> > > preserve datatypes:
> > > validated type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > > converted type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> > NULL
> > > rel:
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[$5])
> > > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > > SECOND)])
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> > >
> > >
> > > flink版本:1.10.1
> > > blink planner,streaming model
> > >
> > >
> > > Thx
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > >
> >
>


-- 
Best regards!
Rui Li


flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Zhou Zach
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


Reason: No factory supports all properties.


The matching candidates:
org.apache.flink.addons.hbase.HBaseTableFactory
Unsupported property keys:
update-mode


The following properties are requested:
connector.table-name=user_hbase10
connector.type=hbase
connector.version=2.1.0
connector.write.buffer-flush.interval=2s
connector.write.buffer-flush.max-rows=1000
connector.write.buffer-flush.max-size=10mb
connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
connector.zookeeper.znode.parent=/hbase
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=rowkey
schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT, `created_time` 
TIMESTAMP(3)
schema.1.name=cf
update-mode=upsert


The following factories have been considered:
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


Reason: No factory supports all properties.


The matching candidates:
org.apache.flink.addons.hbase.HBaseTableFactory
Unsupported property keys:
update-mode


The following properties are requested:
connector.table-name=user_hbase10
connector.type=hbase
connector.version=2.1.0
connector.write.buffer-flush.interval=2s
connector.write.buffer-flush.max-rows=1000
connector.write.buffer-flush.max-size=10mb
connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
connector.zookeeper.znode.parent=/hbase
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=rowkey
schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT, `created_time` 
TIMESTAMP(3)
schema.1.name=cf
update-mode=upsert


The following factories have been considered:
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  

?????? flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
??hbase??hbase




----
??:"Leonard Xu"

回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Sun.Zhu
Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Jark Wu
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。
另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。



On Wed, 17 Jun 2020 at 11:46, Benchao Li  wrote:

> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:
>
> > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > CREATE VIEW uv_per_10min AS
> > SELECT
> >  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER
> w
> > AS time_str,
> >  COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW);
> >
> >
> > 想请教一下,应该如何处理?
> > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > 多谢
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Benchao Li
Hi,
我感觉这种场景可以有两种方式,
1. 可以直接用group by + mini batch
2. window聚合 + fast emit

对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。

对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 60 s

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:

> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> CREATE VIEW uv_per_10min AS
> SELECT
>  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w
> AS time_str,
>  COUNT(DISTINCT user_id) OVER w AS uv
> FROM user_behavior
> WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW);
>
>
> 想请教一下,应该如何处理?
> PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> 多谢


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



??????FLINKSQL1.10????????????UV

2020-06-16 文章 x
??0??UV??UV??
CREATE VIEW uv_per_10min AS
SELECT
 MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w AS 
time_str,
 COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW);


??
PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ??
PS??1.10??DDL??CREATE VIEW??


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Dian Fu
Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: flink sql 中怎么把ROW类型转换成INT

2020-06-16 文章 Leonard Xu

Hi,
> 在 2020年6月17日,09:33,Zhou Zach  写道:
> 
> 怎么把int转换成Integer呢或者把Integer转换成int

我理解 Integer 和 int 是相同的类型,INT 是SQL的类型,Integer 是java中的具体实现类,这个错误看起来是在读 
hbase中的数据时,数据的index不对,
能把 sql 和异常栈贴下吗?

Best,
Leonard Xu

Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Benchao Li
Congratulations Yu!

Jark Wu  于2020年6月17日周三 上午10:36写道:

> Congratulations Yu! Well deserved!
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
>
> > Congratulations Yu!
> >
> > Best,
> > Haibo
> >
> >
> > At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> > >Hi all,
> > >
> > >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> > >part of the Apache Flink Project Management Committee (PMC).
> > >
> > >Yu Li has been very active on Flink's Statebackend component, working on
> > >various improvements, for example the RocksDB memory management for
> 1.10.
> > >and keeps checking and voting for our releases, and also has
> successfully
> > >produced two releases(1.10.0&1.10.1) as RM.
> > >
> > >Congratulations & Welcome Yu Li!
> > >
> > >Best,
> > >Jincheng (on behalf of the Flink PMC)
> >
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Jark Wu
Congratulations Yu! Well deserved!

Best,
Jark

On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:

> Congratulations Yu!
>
> Best,
> Haibo
>
>
> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> >Hi all,
> >
> >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >part of the Apache Flink Project Management Committee (PMC).
> >
> >Yu Li has been very active on Flink's Statebackend component, working on
> >various improvements, for example the RocksDB memory management for 1.10.
> >and keeps checking and voting for our releases, and also has successfully
> >produced two releases(1.10.0&1.10.1) as RM.
> >
> >Congratulations & Welcome Yu Li!
> >
> >Best,
> >Jincheng (on behalf of the Flink PMC)
>
>


Re: 通过Kafka更新规则

2020-06-16 文章 Ruibin Xing
如果有逻辑上的变更,会导致Checkpoint不可用?之前没有从checkpoint恢复状态的经验,没考虑过可以从checkpoint中恢复,我看看相关资料,感谢!

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:57写道:

> 为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧
>
>
>
>
> 在2020年06月16日 11:57,Ruibin Xing 写道:
> 我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
> Flink。
>
> RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。
>
> 目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:
>
> 1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
>   目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。
> 2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。
> 3.使用方案1,但是启动时Flink从RDB拉取全量规则。
> 4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。
>
>   各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Benchao Li
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Leonard Xu
Congratulations Yu !

Best,
Leonard Xu

> 在 2020年6月17日,09:50,Yangze Guo  写道:
> 
> Congrats, Yu!
> Best,
> Yangze Guo
> 
> On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>> 
>> Congratulations Yu, well deserved~!
>> 
>> Thank you~
>> 
>> Xintong Song
>> 
>> 
>> 
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on 
>>> various improvements, for example the RocksDB memory management for 1.10. 
>>> and keeps checking and voting for our releases, and also has successfully 
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Yangze Guo
Congrats, Yu!
Best,
Yangze Guo

On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>
> Congratulations Yu, well deserved~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on 
>> various improvements, for example the RocksDB memory management for 1.10. 
>> and keeps checking and voting for our releases, and also has successfully 
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 Xintong Song
Congratulations Yu, well deserved~!

Thank you~

Xintong Song



On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
wrote:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> part of the Apache Flink Project Management Committee (PMC).
>
> Yu Li has been very active on Flink's Statebackend component, working on
> various improvements, for example the RocksDB memory management for 1.10.
> and keeps checking and voting for our releases, and also has successfully
> produced two releases(1.10.0&1.10.1) as RM.
>
> Congratulations & Welcome Yu Li!
>
> Best,
> Jincheng (on behalf of the Flink PMC)
>


?????? flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
??.??
offset (0) + length (4) exceed the capacity of the array: 2
 ?? hbaseint??
??users.addColumn("cf", "age", classOf[Integer]) ??
??int??IntegerInteger??int





----
??:"Leonard Xu"

[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 文章 jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
part of the Apache Flink Project Management Committee (PMC).

Yu Li has been very active on Flink's Statebackend component, working on
various improvements, for example the RocksDB memory management for 1.10.
and keeps checking and voting for our releases, and also has successfully
produced two releases(1.10.0&1.10.1) as RM.

Congratulations & Welcome Yu Li!

Best,
Jincheng (on behalf of the Flink PMC)


回复:通过Kafka更新规则

2020-06-16 文章 Sun.Zhu
为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧




在2020年06月16日 11:57,Ruibin Xing 写道:
我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
Flink。

RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。

目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:

1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
  目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。
2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。
3.使用方案1,但是启动时Flink从RDB拉取全量规则。
4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。

  各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!


回复:如何做Flink Stream的性能测试

2020-06-16 文章 Sun.Zhu
Hi
1.11 版本内置了DataGen、print、Blackhole的connector用来辅助功能测试,性能测试,线上观察,欢迎试用




在2020年06月16日 09:26,aven.wu 写道:
各位好;
  最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。
   我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。
Best
Aven


回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Sun.Zhu
是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 文章 Jark Wu
如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。

当前版本下的话,可以尝试 keyby+localcache+异步IO。

Best,
Jark

On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:

> 或者采用redis做维表存储介质。
>
> > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> >
> > hi,大家
> > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
>


Re: pyflink连接elasticsearch5.4问题

2020-06-16 文章 Jark Wu
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu  wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack  写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >>
> >>
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >>
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >>
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >>
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >>
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >> area_cnts()
> >>
> >
>
>
>


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 文章 李奇
频繁是什么级别的?可以加缓存。然后再定期更新。

> 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> 
> hi,大家
> 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?


对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 文章 wangxiangyan
hi,大家
维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?

?????? Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach
2020-06-16 21:01:09,756 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka version: unknown
2020-06-16 21:01:09,757 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
 - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): 
user_behavior-0
2020-06-16 21:01:09,765 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - Cluster 
ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig
 - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka version: unknown
2020-06-16 21:01:09,767 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher
 - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition 
user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO 
org.apache.flink.addons.hbase.HBaseLookupFunction
  - start close ...
2020-06-16 21:01:35,906 INFO 
org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient
  - Close zookeeper connection 0x72d39885 to 
cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO 
org.apache.flink.addons.hbase.HBaseLookupFunction
  - end close.
2020-06-16 21:01:35,908 INFO org.apache.zookeeper.ZooKeeper  
   
   - Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO org.apache.zookeeper.ClientCnxn  
   
  - EventThread shut down
2020-06-16 21:01:35,911 INFO 
org.apache.flink.runtime.taskmanager.Task 
 - Source: KafkaTableSource(uid, 
phoneType, clickCount, time) - 
SourceConversion(table=[default_catalog.default_database.user_behavior, source: 
[KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, 
clickCount, time]) - Calc(select=[uid, time]) - 
LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], 
joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, 
rowkey, cf]) - Calc(select=[CAST(time) AS time, cf.age AS age]) - 
SinkConversionToTuple2 - Sink: JDBCUpsertTableSink(time, age) (1/2) 
(e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity 
of the array: 2
at 
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
at 
org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
at 
org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
at 
org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
at LookupFunction$12.flatMap(Unknown Source)
at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at StreamExecCalc$7.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$6.processElement(Unknown Source)
  

Re: flink sql 中怎么把ROW类型转换成INT

2020-06-16 文章 Leonard Xu
Hi
 Hbase connector中,除 rowkey 字段外,所有列簇 在FLINK中对应的类型都是 符合类型ROW(),这是因为ROW中可以包括多个 
field 能够和 hbase 中的一个列簇可以包含多个列很好地对应。贴个文档,你一看就懂:

CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 family2 ROW,
 family3 ROW,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (...);
 
-- scan data from the HBase table
SELECT rowkey, family1.q1, family3.q4, family3.q6 FROM hTable;
 

这个文档正在编写当中,很快就可以在官网上看到。

祝好,
Leonard Xu

> 在 2020年6月16日,19:16,Yichao Yang <1048262...@qq.com> 写道:
> 
> Hi
> 
> 
> row类型是不能强转int的,可以找一下阿里云flink sql的文档,其中有介绍哪些数据类型可以互转。
> 
> 
> Best,
> Yichao Yang
> 
> 
> 
> 发自我的iPhone
> 
> 
> -- 原始邮件 --
> 发件人: Zhou Zach  发送时间: 2020年6月16日 19:10
> 收件人: user-zh  主题: 回复:flink sql 中怎么把ROW类型转换成INT
> 
> 
> 
> flink sql从HBase中读取的类型为ROW,怎么把ROW类型转换成INT
> select cast(cf as Int) cf from hbase_table
> 直接这样转换不成功



Re: Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Benchao Li
不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。
比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users

Zhou Zach  于2020年6月16日周二 下午6:59写道:

> flink sql 怎么将ROW<`age` INT转换成INT啊
>
>
> streamTableEnv.sqlUpdate(
>   """
> |
> |insert into  user_age
> |SELECT rowkey, cast(cf as int) as age
> |FROM
> |  users
> |
> |""".stripMargin)这样尝试报错了


回复:异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 文章 Yichao Yang
Hi


使用到缓存的话大多数情况下都要用到keyby,并且在keyby之后的算子使用缓存,保证相同key只会访问一个缓存,否则缓存命中率一般情况下都会很低。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Benchao Li 

回复:Re:关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 文章 Yichao Yang
Hi


个人理解一般情况下都是业务需要才会做keyby操作,比如想统计一个用户一分钟pv按照userid 
keyby。如果你的任务没有这样的业务需求完全不用考虑使用这些算子的。


Best,
Yichao Yang






-- 原始邮件 --
发件人: Michael Ran 

回复:flink sql 中怎么把ROW类型转换成INT

2020-06-16 文章 Yichao Yang
Hi


row类型是不能强转int的,可以找一下阿里云flink sql的文档,其中有介绍哪些数据类型可以互转。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Zhou Zach 

flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
flink sql??HBase??ROWROW??INT
select cast(cf as Int) cf from hbase_table
??

Re:关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 文章 Michael Ran
默认会有中hash 吧,看做啥操作
在 2020-06-16 18:28:51,"hdxg1101300...@163.com"  写道:
>
>您好:
>如果我的数据本身比较分散,重复的ID很少,还有必要进行keyby操作吗
>谢谢!
>
>
>hdxg1101300...@163.com


Re:Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach
flink sql ??ROW<`age` INT??INT??


streamTableEnv.sqlUpdate(
  """
|
|insert into  user_age
|SELECT rowkey, cast(cf as int) as age
|FROM
|  users
|
|""".stripMargin)??

Re: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 文章 Benchao Li
是否有观察过缓存命中率呢,如果缓存命中率不高,性能也不会太好的。

hdxg1101300...@163.com  于2020年6月16日周二 下午6:29写道:

> 目前使用guava的cache做了缓存但是效果不是很好
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Benchao Li
> 发送时间: 2020-06-16 17:40
> 收件人: user-zh
> 主题: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题
> 感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
> 要不然就是优化提升外部系统的吞吐。
>
> hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:
>
> > 您好:
> > 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> > flink1.10.0
> >
> >
> >
> > hdxg1101300...@163.com
> >
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Benchao Li
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 文章 hdxg1101300...@163.com
目前使用guava的cache做了缓存但是效果不是很好



hdxg1101300...@163.com
 
发件人: Benchao Li
发送时间: 2020-06-16 17:40
收件人: user-zh
主题: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题
感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
要不然就是优化提升外部系统的吞吐。
 
hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:
 
> 您好:
> 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> flink1.10.0
>
>
>
> hdxg1101300...@163.com
>


关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 文章 hdxg1101300...@163.com

您好:
如果我的数据本身比较分散,重复的ID很少,还有必要进行keyby操作吗
谢谢!


hdxg1101300...@163.com


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 文章 Sun.Zhu
我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 


Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制




Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Benchao Li
Hi,

上面的错误提示已经比较明确了,说的是你的query的schema跟sink table的schema对不上。
query的schema是:[rowkey: STRING, cf: ROW<`age` INT>]
而sink的schema是:[rowkey: STRING, age: INT]

你可以调整一下你的sink的schema;或者调整一下你的query语句。

Zhou Zach  于2020年6月16日周二 下午5:51写道:

>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Field types of query result and registered
> TableSink default_catalog.default_database.user_age do not match.
> Query schema: [rowkey: STRING, cf: ROW<`age` INT>]
> Sink schema: [rowkey: STRING, age: INT]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.user_age do not match.
>
>
>
>
>
>
>
>
> query:
>
>
>
>
> val users = new HBaseTableSource(hConf, "user_hbase5")
> users.setRowKey("rowkey", classOf[String]) // currency as the primary key
> users.addColumn("cf", "age", classOf[Integer])
>
> streamTableEnv.registerTableSource("users", users)
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_age (
> |`rowkey` VARCHAR,
> |age INT
> |) WITH (
> |'connector.type' = 'jdbc',
> |'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
>
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into  user_age
> |SELECT *
> |FROM
> |  users
> |
> |""".stripMargin)


flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Field types of query result and registered TableSink 
default_catalog.default_database.user_age do not match.
Query schema: [rowkey: STRING, cf: ROW<`age` INT>]
Sink schema: [rowkey: STRING, age: INT]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: Field types of query 
result and registered TableSink default_catalog.default_database.user_age do 
not match.








query:




val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])

streamTableEnv.registerTableSource("users", users)


streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_age (
|`rowkey` VARCHAR,
|age INT
|) WITH (
|'connector.type' = 'jdbc',
|'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)

streamTableEnv.sqlUpdate(
"""
|
|insert into  user_age
|SELECT *
|FROM
|  users
|
|""".stripMargin)

Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 文章 Benchao Li
感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
要不然就是优化提升外部系统的吞吐。

hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:

> 您好:
> 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> flink1.10.0
>
>
>
> hdxg1101300...@163.com
>


异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 文章 hdxg1101300...@163.com
您好:
采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
flink1.10.0



hdxg1101300...@163.com


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
看了下issue,跟我描述的问题很相似,我尝试切到1.10.1试试看,谢谢您的解惑

Kurt Young  于2020年6月16日周二 下午5:15写道:

> 应该是这个: https://issues.apache.org/jira/browse/FLINK-16068
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:
>
> > 我看了下1.10.1的release note,您说的应该就是这个issue:
> > https://issues.apache.org/jira/browse/FLINK-16345
> > ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> > DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
> >
> > Benchao Li  于2020年6月16日周二 下午5:00写道:
> >
> > > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> > >
> > > zilong xiao  于2020年6月16日周二 下午4:56写道:
> > >
> > >> 如题,在SQL
> > >>
> >
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> > >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> > >> 代码如下图:
> > >> [image: image.png]
> > >> 异常堆栈:
> > >>
> > >
> >
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 Kurt Young
应该是这个: https://issues.apache.org/jira/browse/FLINK-16068

Best,
Kurt


On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:

> 我看了下1.10.1的release note,您说的应该就是这个issue:
> https://issues.apache.org/jira/browse/FLINK-16345
> ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
>
> Benchao Li  于2020年6月16日周二 下午5:00写道:
>
> > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> >
> > zilong xiao  于2020年6月16日周二 下午4:56写道:
> >
> >> 如题,在SQL
> >>
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> >> 代码如下图:
> >> [image: image.png]
> >> 异常堆栈:
> >>
> >
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
我看了下1.10.1的release note,您说的应该就是这个issue:
https://issues.apache.org/jira/browse/FLINK-16345
,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
是用的1.10.0版本,我尝试切到1.10.1试试看,请问这个有对应的issue吗?想深入了解下这个问题

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 Benchao Li
你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。

zilong xiao  于2020年6月16日周二 下午4:56写道:

> 如题,在SQL
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> 代码如下图:
> [image: image.png]
> 异常堆栈:
>


Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
如题,在SQL
ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
代码如下图:
[image: image.png]
异常堆栈:


?????? flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-16 文章 kcz
??




----
??:""https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq
04:17:00??

Kurt Young https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 
 
  Best,
  Yichao Yang
 
 
 
 
  --nbsp;nbsp;--
  ??:nbsp;"Kurt Young"

Re: pyflink连接elasticsearch5.4问题

2020-06-16 文章 Dian Fu
可以发一下完整的异常吗?

> 在 2020年6月16日,下午3:45,jack  写道:
> 
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> 
> 
> 
> 
> 
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >> 
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。 
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >> 
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >> 
> >> 
> >> 
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >> 
> >> 
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >> 
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >> 
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >> 
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >> 
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >> 
> >> 
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >> 
> >> 
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >> 
> >> 
> >> if __name__ == '__main__':
> >> area_cnts()
> >> 
> >



Re: Re: 如何做checkpoint的灾备

2020-06-16 文章 dixingxin...@163.com
@Congxian  感谢你的回复,我们会参考你的思路。



Best,
Xingxing Di
 
Sender: Congxian Qiu
Send Time: 2020-06-15 09:55
Receiver: user-zh
cc: zhangyingchen; pengxingbo
Subject: Re: Re: 如何做checkpoint的灾备
正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有
FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。
 
对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta
统一存储到某个地方,这样后续直接从这个地方读取即可。
 
Best,
Congxian
 
 
dixingxin...@163.com  于2020年6月15日周一 上午12:06写道:
 
> @Congxian Qiu 感谢你的回复!
> 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11
> 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
> 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
> 3.checkpoint双写:
> 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
>
> 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
> 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
> @唐云  感谢你的恢复!
> 我们试一下你说的方式,感谢。
>
> 题外话,我还有个疑问,就是如何判断checkpoint是否可用?
>
> 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
>
> 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
> PS:我们用的是flink 1.9.2
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Yun Tang
> 发送时间: 2020-06-14 00:48
> 收件人: user-zh
> 主题: Re: 如何做checkpoint的灾备
> Hi Xingxing
>
> 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1]
> 来忽略失败的拷贝(例如FileNotFoundException)
> 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
>
> 祝好
> 唐云
>
> 
> From: Congxian Qiu 
> Sent: Saturday, June 13, 2020 16:54
> To: user-zh 
> Subject: Re: 如何做checkpoint的灾备
>
> Hi
>
> 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
> 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
>
> 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
> 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
> checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
> 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
> 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
>
> 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
> savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
> checkpoint 慢很多
>
> [1]
>
> https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
> [2] https://issues.apache.org/jira/browse/FLINK-5763
>
> Best,
> Congxian
>
>
> dixingxin...@163.com  于2020年6月11日周四 下午8:21写道:
>
> > Hi Flink社区,
> > 目前我们在调研checkpoint
> > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
> >
> >
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>


Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 文章 jack
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>> st_env.connect(
>> Elasticsearch()
>> .version("5")
>> .host("localhost", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu"  写道:
>I guess it's because the ES version specified in the job is `6`, however, the 
>jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack  写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
>> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   
>> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, 
>> EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
>> Elasticsearch
>> 
>> 
>> def area_cnts():
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> s_env.set_parallelism(1)
>> 
>> # use blink table planner
>> st_env = StreamTableEnvironment \
>> .create(s_env, environment_settings=EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> .use_blink_planner().build())
>> 
>> # register source and sink
>> register_rides_source(st_env)
>> register_cnt_sink(st_env)
>> 
>> # query
>> st_env.from_path("source")\
>> .group_by("taxiId")\
>> .select("taxiId, count(1) as cnt")\
>> .insert_into("sink")
>> 
>> # execute
>> st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>> st_env \
>> .connect(  # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("Rides")
>> .start_from_earliest()
>> .property("zookeeper.connect", "zookeeper:2181")
>> .property("bootstrap.servers", "kafka:9092")) \
>> .with_format(  # declare a format for this system
>> Json()
>> .fail_on_missing_field(True)
>> .schema(DataTypes.ROW([
>> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>> DataTypes.FIELD("lon", DataTypes.FLOAT()),
>> DataTypes.FIELD("lat", DataTypes.FLOAT()),
>> DataTypes.FIELD("psgCnt", DataTypes.INT()),
>> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("rideId", DataTypes.BIGINT())
>> .field("taxiId", DataTypes.BIGINT())
>> .field("isStart", DataTypes.BOOLEAN())
>> .field("lon", DataTypes.FLOAT())
>> .field("lat", DataTypes.FLOAT())
>> .field("psgCnt", DataTypes.INT())
>> .field("rideTime", DataTypes.TIMESTAMP())
>> .rowtime(
>> Rowtime()
>> .timestamps_from_field("eventTime")
>> .watermarks_periodic_bounded(6))) \
>> .in_append_mode() \
>> .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>> st_env.connect(
>> Elasticsearch()
>> .version("6")
>> .host("elasticsearch", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \
>> .with_schema(
>> Schema()
>> .field("taxiId", DataTypes.BIGINT())
>> .field("cnt", DataTypes.BIGINT())) \
>> .with_format(
>>Json()
>>.derive_schema()) \
>> .in_upsert_mode() \
>> .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>> area_cnts()
>> 
>


Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
有输出的

















在 2020-06-16 15:24:29,"王松"  写道:
>那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗?
>
>Zhou Zach  于2020年6月16日周二 下午3:22写道:
>
>>
>>
>>
>>
>>
>>
>> 在/etc/profile下,目前只加了
>> export HADOOP_CLASSPATH=`hadoop classpath`
>> 我是安装的CDH,没找到sbin这个文件。。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-16 15:05:12,"王松"  写道:
>> >你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?
>> >
>> >export HADOOP_HOME=/usr/local/hadoop-2.7.2
>> >export HADOOP_CLASSPATH=`hadoop classpath`
>> >export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
>> >
>> >Zhou Zach  于2020年6月16日周二 下午2:53写道:
>> >
>> >> flink/lib/下的jar:
>> >> flink-connector-hive_2.11-1.10.0.jar
>> >> flink-dist_2.11-1.10.0.jar
>> >> flink-jdbc_2.11-1.10.0.jar
>> >> flink-json-1.10.0.jar
>> >> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
>> >> flink-sql-connector-kafka_2.11-1.10.0.jar
>> >> flink-table_2.11-1.10.0.jar
>> >> flink-table-blink_2.11-1.10.0.jar
>> >> hbase-client-2.1.0.jar
>> >> hbase-common-2.1.0.jar
>> >> hive-exec-2.1.1.jar
>> >> mysql-connector-java-5.1.49.jar
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >high-availability.storageDir: hdfs:///flink/ha/
>> >> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>> >> >state.backend: filesystem
>> >> >state.checkpoints.dir:
>> hdfs://nameservice1:8020//user/flink10/checkpoints
>> >> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
>> >> >high-availability.zookeeper.path.root: /flink
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >在 2020-06-16 14:44:02,"王松"  写道:
>> >> >>你的配置文件中ha配置可以贴下吗
>> >> >>
>> >> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
>> >> >>
>> >> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException:
>> Failed
>> >> to
>> >> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>> >> >>>
>> >> >>> Caused by: java.io.IOException: Could not create FileSystem for
>> highly
>> >> >>> available storage path
>> (hdfs:/flink/ha/application_1592215995564_0027)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>> >> >>>
>> >> >>> at java.security.AccessController.doPrivileged(Native Method)
>> >> >>>
>> >> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>> >> >>>
>> >> >>> ... 2 more
>> >> >>>
>> >> >>> Caused by:
>> >> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> >> >>> Could not find a file system implementation for scheme 'hdfs'. The
>> >> scheme
>> >> >>> is not directly supported by Flink and no Hadoop file system to
>> support
>> >> >>> this scheme could be loaded.
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>> >> >>>
>> >> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>> >> >>>
>> >> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>> >> >>>
>> >> >>> at
>> >> >>>
>> >>
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>> >> >>>
>> >> >>> ... 13 more
>> >> >>>
>> >> >>> Caused 

Re: pyflink连接elasticsearch5.4问题

2020-06-16 文章 Dian Fu
I guess it's because the ES version specified in the job is `6`, however, the 
jar used is `5`.

> 在 2020年6月16日,下午1:47,jack  写道:
> 
> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> 连接es的时候报错,findAndCreateTableSink   failed。 
> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> 
> Caused by Could not find a suitable  factory for   
> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> Reason: Required context properties mismatch
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes, 
> EnvironmentSettings
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> Elasticsearch
> 
> 
> def area_cnts():
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> 
> # use blink table planner
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> 
> # register source and sink
> register_rides_source(st_env)
> register_cnt_sink(st_env)
> 
> # query
> st_env.from_path("source")\
> .group_by("taxiId")\
> .select("taxiId, count(1) as cnt")\
> .insert_into("sink")
> 
> # execute
> st_env.execute("6-write_with_elasticsearch")
> 
> 
> def register_rides_source(st_env):
> st_env \
> .connect(  # declare the external system to connect to
> Kafka()
> .version("universal")
> .topic("Rides")
> .start_from_earliest()
> .property("zookeeper.connect", "zookeeper:2181")
> .property("bootstrap.servers", "kafka:9092")) \
> .with_format(  # declare a format for this system
> Json()
> .fail_on_missing_field(True)
> .schema(DataTypes.ROW([
> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("rideId", DataTypes.BIGINT())
> .field("taxiId", DataTypes.BIGINT())
> .field("isStart", DataTypes.BOOLEAN())
> .field("lon", DataTypes.FLOAT())
> .field("lat", DataTypes.FLOAT())
> .field("psgCnt", DataTypes.INT())
> .field("rideTime", DataTypes.TIMESTAMP())
> .rowtime(
> Rowtime()
> .timestamps_from_field("eventTime")
> .watermarks_periodic_bounded(6))) \
> .in_append_mode() \
> .register_table_source("source")
> 
> 
> def register_cnt_sink(st_env):
> st_env.connect(
> Elasticsearch()
> .version("6")
> .host("elasticsearch", 9200, "http")
> .index("taxiid-cnts")
> .document_type('taxiidcnt')
> .key_delimiter("$")) \
> .with_schema(
> Schema()
> .field("taxiId", DataTypes.BIGINT())
> .field("cnt", DataTypes.BIGINT())) \
> .with_format(
>Json()
>.derive_schema()) \
> .in_upsert_mode() \
> .register_table_sink("sink")
> 
> 
> if __name__ == '__main__':
> area_cnts()
> 



Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Yang Wang
你这个看着hadoop兼容导致的问题,ContentSummary这个类是从hadoop 2.8以后发生了
变化。所以你需要确认你的lib下带的flink-shaded-hadoop与hdfs集群的版本是兼容的


Best,
Yang

Zhou Zach  于2020年6月16日周二 下午2:53写道:

> flink/lib/下的jar:
> flink-connector-hive_2.11-1.10.0.jar
> flink-dist_2.11-1.10.0.jar
> flink-jdbc_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-table_2.11-1.10.0.jar
> flink-table-blink_2.11-1.10.0.jar
> hbase-client-2.1.0.jar
> hbase-common-2.1.0.jar
> hive-exec-2.1.1.jar
> mysql-connector-java-5.1.49.jar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
> >
> >
> >
> >
> >high-availability.storageDir: hdfs:///flink/ha/
> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> >state.backend: filesystem
> >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
> >high-availability.zookeeper.path.root: /flink
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-16 14:44:02,"王松"  写道:
> >>你的配置文件中ha配置可以贴下吗
> >>
> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
> >>
> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to
> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> >>>
> >>> at
> >>>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> >>>
> >>> Caused by: java.io.IOException: Could not create FileSystem for highly
> >>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> >>>
> >>> ... 2 more
> >>>
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 'hdfs'. The
> scheme
> >>> is not directly supported by Flink and no Hadoop file system to support
> >>> this scheme could be loaded.
> >>>
> >>> at
> >>>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
> >>>
> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> >>>
> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
> >>>
> >>> ... 13 more
> >>>
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is
> not in
> >>> the classpath, or some classes are missing from the classpath.
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
> >>>
> >>> at
> >>>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
> >>>
> >>> ... 16 more
> >>>
> >>> Caused by: java.lang.VerifyError: Bad return type
> >>>
> >>> Exception Details:
> >>>
> >>>   Location:
> >>>
> >>>
> >>>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> >>> @160: areturn
> >>>
> >>>   Reason:
> >>>
> >>> Type 'org/apache/hadoop/fs/ContentSummary' (current frame,
> stack[0])
> >>> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
> >>> signature)
> >>>
> >>>   Current Frame:
> >>>
> >>> bci: @160
> >>>
> >>> flags: { }
> >>>
> >>> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
> >>> 

Re: Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 王松
那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗?

Zhou Zach  于2020年6月16日周二 下午3:22写道:

>
>
>
>
>
>
> 在/etc/profile下,目前只加了
> export HADOOP_CLASSPATH=`hadoop classpath`
> 我是安装的CDH,没找到sbin这个文件。。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-16 15:05:12,"王松"  写道:
> >你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?
> >
> >export HADOOP_HOME=/usr/local/hadoop-2.7.2
> >export HADOOP_CLASSPATH=`hadoop classpath`
> >export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
> >
> >Zhou Zach  于2020年6月16日周二 下午2:53写道:
> >
> >> flink/lib/下的jar:
> >> flink-connector-hive_2.11-1.10.0.jar
> >> flink-dist_2.11-1.10.0.jar
> >> flink-jdbc_2.11-1.10.0.jar
> >> flink-json-1.10.0.jar
> >> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
> >> flink-sql-connector-kafka_2.11-1.10.0.jar
> >> flink-table_2.11-1.10.0.jar
> >> flink-table-blink_2.11-1.10.0.jar
> >> hbase-client-2.1.0.jar
> >> hbase-common-2.1.0.jar
> >> hive-exec-2.1.1.jar
> >> mysql-connector-java-5.1.49.jar
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
> >> >
> >> >
> >> >
> >> >
> >> >high-availability.storageDir: hdfs:///flink/ha/
> >> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> >> >state.backend: filesystem
> >> >state.checkpoints.dir:
> hdfs://nameservice1:8020//user/flink10/checkpoints
> >> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
> >> >high-availability.zookeeper.path.root: /flink
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >在 2020-06-16 14:44:02,"王松"  写道:
> >> >>你的配置文件中ha配置可以贴下吗
> >> >>
> >> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
> >> >>
> >> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException:
> Failed
> >> to
> >> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> >> >>>
> >> >>> Caused by: java.io.IOException: Could not create FileSystem for
> highly
> >> >>> available storage path
> (hdfs:/flink/ha/application_1592215995564_0027)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> >> >>>
> >> >>> at java.security.AccessController.doPrivileged(Native Method)
> >> >>>
> >> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> >> >>>
> >> >>> ... 2 more
> >> >>>
> >> >>> Caused by:
> >> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >> >>> Could not find a file system implementation for scheme 'hdfs'. The
> >> scheme
> >> >>> is not directly supported by Flink and no Hadoop file system to
> support
> >> >>> this scheme could be loaded.
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
> >> >>>
> >> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> >> >>>
> >> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> >> >>>
> >> >>> at
> >> >>>
> >>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
> >> >>>
> >> >>> ... 13 more
> >> >>>
> >> >>> Caused by:
> >> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >> >>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is
> >> not in
> >> >>> the classpath, or some classes are missing from the classpath.
> >> >>>
> >> >>> at
> >> 

Re:Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach






在/etc/profile下,目前只加了
export HADOOP_CLASSPATH=`hadoop classpath`
我是安装的CDH,没找到sbin这个文件。。











在 2020-06-16 15:05:12,"王松"  写道:
>你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?
>
>export HADOOP_HOME=/usr/local/hadoop-2.7.2
>export HADOOP_CLASSPATH=`hadoop classpath`
>export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
>
>Zhou Zach  于2020年6月16日周二 下午2:53写道:
>
>> flink/lib/下的jar:
>> flink-connector-hive_2.11-1.10.0.jar
>> flink-dist_2.11-1.10.0.jar
>> flink-jdbc_2.11-1.10.0.jar
>> flink-json-1.10.0.jar
>> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
>> flink-sql-connector-kafka_2.11-1.10.0.jar
>> flink-table_2.11-1.10.0.jar
>> flink-table-blink_2.11-1.10.0.jar
>> hbase-client-2.1.0.jar
>> hbase-common-2.1.0.jar
>> hive-exec-2.1.1.jar
>> mysql-connector-java-5.1.49.jar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
>> >
>> >
>> >
>> >
>> >high-availability.storageDir: hdfs:///flink/ha/
>> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>> >state.backend: filesystem
>> >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
>> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
>> >high-availability.zookeeper.path.root: /flink
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2020-06-16 14:44:02,"王松"  写道:
>> >>你的配置文件中ha配置可以贴下吗
>> >>
>> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
>> >>
>> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
>> to
>> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>> >>>
>> >>> Caused by: java.io.IOException: Could not create FileSystem for highly
>> >>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>> >>>
>> >>> at java.security.AccessController.doPrivileged(Native Method)
>> >>>
>> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> >>>
>> >>> at
>> >>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>> >>>
>> >>> ... 2 more
>> >>>
>> >>> Caused by:
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> >>> Could not find a file system implementation for scheme 'hdfs'. The
>> scheme
>> >>> is not directly supported by Flink and no Hadoop file system to support
>> >>> this scheme could be loaded.
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>> >>>
>> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>> >>>
>> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>> >>>
>> >>> ... 13 more
>> >>>
>> >>> Caused by:
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> >>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is
>> not in
>> >>> the classpath, or some classes are missing from the classpath.
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>> >>>
>> >>> at
>> >>>
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>> >>>
>> >>> ... 16 more
>> >>>
>> >>> Caused by: java.lang.VerifyError: Bad return type
>> >>>
>> >>> Exception Details:
>> >>>
>> >>>   Location:
>> >>>
>> >>>
>> >>>
>> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
>> >>> 

Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-16 文章 王松
6.14号的meetup中讲的动态 Table 属性很清楚,附个链接:
https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq
,大概在04:17:00左右。

Kurt Young  于2020年6月16日周二 下午12:12写道:

> table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧?
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote:
>
> > Hi
> >
> >
> > 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
> >
> >
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比较模糊的。
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Kurt Young" > 发送时间:2020年6月16日(星期二) 上午9:53
> > 收件人:"user-zh" >
> > 主题:Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))
> >
> >
> >
> > 就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com wrote:
> >
> >  动态 Table 属性是指什么?可以举一个列子吗。
>


Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 王松
你配置HADOOP_HOME和HADOOP_CLASSPATH这两个环境变量了吗?

export HADOOP_HOME=/usr/local/hadoop-2.7.2
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

Zhou Zach  于2020年6月16日周二 下午2:53写道:

> flink/lib/下的jar:
> flink-connector-hive_2.11-1.10.0.jar
> flink-dist_2.11-1.10.0.jar
> flink-jdbc_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-table_2.11-1.10.0.jar
> flink-table-blink_2.11-1.10.0.jar
> hbase-client-2.1.0.jar
> hbase-common-2.1.0.jar
> hive-exec-2.1.1.jar
> mysql-connector-java-5.1.49.jar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-16 14:48:43,"Zhou Zach"  写道:
> >
> >
> >
> >
> >high-availability.storageDir: hdfs:///flink/ha/
> >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> >state.backend: filesystem
> >state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
> >state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
> >high-availability.zookeeper.path.root: /flink
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-16 14:44:02,"王松"  写道:
> >>你的配置文件中ha配置可以贴下吗
> >>
> >>Zhou Zach  于2020年6月16日周二 下午1:49写道:
> >>
> >>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to
> >>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> >>>
> >>> at
> >>>
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> >>>
> >>> Caused by: java.io.IOException: Could not create FileSystem for highly
> >>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> >>>
> >>> ... 2 more
> >>>
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 'hdfs'. The
> scheme
> >>> is not directly supported by Flink and no Hadoop file system to support
> >>> this scheme could be loaded.
> >>>
> >>> at
> >>>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
> >>>
> >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> >>>
> >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
> >>>
> >>> ... 13 more
> >>>
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is
> not in
> >>> the classpath, or some classes are missing from the classpath.
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
> >>>
> >>> at
> >>>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
> >>>
> >>> ... 16 more
> >>>
> >>> Caused by: java.lang.VerifyError: Bad return type
> >>>
> >>> Exception Details:
> >>>
> >>>   Location:
> >>>
> >>>
> >>>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> >>> @160: areturn
> >>>
> >>>   Reason:
> >>>
> >>> Type 'org/apache/hadoop/fs/ContentSummary' (current frame,
> stack[0])
> >>> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
> >>> signature)
> >>>
> >>>   Current Frame:
> >>>
> >>> bci: @160
> >>>
> >>> flags: { }
> >>>
> >>> locals: { 

Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
flink/lib/下的jar:
flink-connector-hive_2.11-1.10.0.jar
flink-dist_2.11-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar
flink-json-1.10.0.jar
flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
hbase-client-2.1.0.jar
hbase-common-2.1.0.jar
hive-exec-2.1.1.jar
mysql-connector-java-5.1.49.jar

















在 2020-06-16 14:48:43,"Zhou Zach"  写道:
>
>
>
>
>high-availability.storageDir: hdfs:///flink/ha/
>high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>state.backend: filesystem
>state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
>state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
>high-availability.zookeeper.path.root: /flink
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-16 14:44:02,"王松"  写道:
>>你的配置文件中ha配置可以贴下吗
>>
>>Zhou Zach  于2020年6月16日周二 下午1:49写道:
>>
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>>>
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>>>
>>> Caused by: java.io.IOException: Could not create FileSystem for highly
>>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>>>
>>> at
>>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>>>
>>> ... 2 more
>>>
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Could not find a file system implementation for scheme 'hdfs'. The scheme
>>> is not directly supported by Flink and no Hadoop file system to support
>>> this scheme could be loaded.
>>>
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>>>
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>>>
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>>>
>>> ... 13 more
>>>
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
>>> the classpath, or some classes are missing from the classpath.
>>>
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>>>
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>>>
>>> ... 16 more
>>>
>>> Caused by: java.lang.VerifyError: Bad return type
>>>
>>> Exception Details:
>>>
>>>   Location:
>>>
>>>
>>> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
>>> @160: areturn
>>>
>>>   Reason:
>>>
>>> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0])
>>> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
>>> signature)
>>>
>>>   Current Frame:
>>>
>>> bci: @160
>>>
>>> flags: { }
>>>
>>> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
>>> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
>>> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>>>
>>>
>>>
>>> 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,
>>>
>>>
>>>
>>>


Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
high-availability: zookeeper

















在 2020-06-16 14:48:43,"Zhou Zach"  写道:
>
>
>
>
>high-availability.storageDir: hdfs:///flink/ha/
>high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>state.backend: filesystem
>state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
>state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
>high-availability.zookeeper.path.root: /flink
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-16 14:44:02,"王松"  写道:
>>你的配置文件中ha配置可以贴下吗
>>
>>Zhou Zach  于2020年6月16日周二 下午1:49写道:
>>
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>>>
>>> at
>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>>>
>>> Caused by: java.io.IOException: Could not create FileSystem for highly
>>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>>>
>>> at
>>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>>>
>>> ... 2 more
>>>
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Could not find a file system implementation for scheme 'hdfs'. The scheme
>>> is not directly supported by Flink and no Hadoop file system to support
>>> this scheme could be loaded.
>>>
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>>>
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>>>
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>>>
>>> at
>>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>>>
>>> ... 13 more
>>>
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
>>> the classpath, or some classes are missing from the classpath.
>>>
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>>>
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>>>
>>> ... 16 more
>>>
>>> Caused by: java.lang.VerifyError: Bad return type
>>>
>>> Exception Details:
>>>
>>>   Location:
>>>
>>>
>>> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
>>> @160: areturn
>>>
>>>   Reason:
>>>
>>> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0])
>>> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
>>> signature)
>>>
>>>   Current Frame:
>>>
>>> bci: @160
>>>
>>> flags: { }
>>>
>>> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
>>> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
>>> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>>>
>>>
>>>
>>> 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,
>>>
>>>
>>>
>>>


Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach




high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs://nameservice1:8020//user/flink10/checkpoints
state.savepoints.dir: hdfs://nameservice1:8020//user/flink10/savepoints
high-availability.zookeeper.path.root: /flink

















在 2020-06-16 14:44:02,"王松"  写道:
>你的配置文件中ha配置可以贴下吗
>
>Zhou Zach  于2020年6月16日周二 下午1:49写道:
>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>>
>> at
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>>
>> Caused by: java.io.IOException: Could not create FileSystem for highly
>> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>>
>> at
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>>
>> at
>> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>>
>> at
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>>
>> ... 2 more
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 'hdfs'. The scheme
>> is not directly supported by Flink and no Hadoop file system to support
>> this scheme could be loaded.
>>
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>>
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>>
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>>
>> at
>> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>>
>> ... 13 more
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
>> the classpath, or some classes are missing from the classpath.
>>
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>>
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>>
>> ... 16 more
>>
>> Caused by: java.lang.VerifyError: Bad return type
>>
>> Exception Details:
>>
>>   Location:
>>
>>
>> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
>> @160: areturn
>>
>>   Reason:
>>
>> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0])
>> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
>> signature)
>>
>>   Current Frame:
>>
>> bci: @160
>>
>> flags: { }
>>
>> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
>> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
>> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>>
>>
>>
>> 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,
>>
>>
>>
>>


Re: flink sql job 提交到yarn上报错

2020-06-16 文章 王松
你的配置文件中ha配置可以贴下吗

Zhou Zach  于2020年6月16日周二 下午1:49写道:

> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>
> Caused by: java.io.IOException: Could not create FileSystem for highly
> available storage path (hdfs:/flink/ha/application_1592215995564_0027)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>
> ... 2 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>
> at
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>
> ... 13 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath.
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>
> ... 16 more
>
> Caused by: java.lang.VerifyError: Bad return type
>
> Exception Details:
>
>   Location:
>
>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> @160: areturn
>
>   Reason:
>
> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0])
> is not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method
> signature)
>
>   Current Frame:
>
> bci: @160
>
> flags: { }
>
> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>
>
>
> 在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,
>
>
>
>


Re:flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
将flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar放在flink/lib目录下,或者打入fat jar都不起作用。。。
















At 2020-06-16 13:49:27, "Zhou Zach"  wrote:

org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)

at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)

Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path (hdfs:/flink/ha/application_1592215995564_0027)

at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)

at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)

at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)

... 2 more

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.

at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)

at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)

... 13 more

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in the 
classpath, or some classes are missing from the classpath.

at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)

at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)

... 16 more

Caused by: java.lang.VerifyError: Bad return type

Exception Details:

  Location:


org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
 @160: areturn

  Reason:

Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is not 
assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature)

  Current Frame:

bci: @160

flags: { }

locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String', 
'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }

stack: { 'org/apache/hadoop/fs/ContentSummary' }






在本地intellij idea中可以正常运行,flink job上订阅kafka,sink到mysql和hbase,集群flink lib目录下,