Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 Thread Jark Wu
在 Flink 1.11 中,你可以尝试这样:

CREATE TABLE mysql (
   time_str STRING,
   uv BIGINT,
   PRIMARY KEY (ts) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'myuv'
);

INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT  user_id)
FROM user_behavior;

On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote:

> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 1
> 2020/06/17 13:47:00 2
> 2020/06/17 13:48:00 3
>
>
> group by 日期的话,分钟如何获取
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年6月17日(星期三) 中午11:46
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com 于2020年6月17日周三 上午11:14写道:
>
>  需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  CREATE VIEW uv_per_10min AS
>  SELECTnbsp;
>  nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd
> HH:mm:00'))nbsp;OVER w
>  AS time_str,nbsp;
>  nbsp; COUNT(DISTINCT user_id) OVER w AS uv
>  FROM user_behavior
>  WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
>  CURRENT ROW);
> 
> 
>  想请教一下,应该如何处理?
>  PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
>  PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  多谢


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Jingsong Li
Congratulations Yu, well deserved!

Best,
Jingsong

On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:

> Congrats, Yu!
>
> GXGX & well deserved!!
>
> Best Regards,
>
> Yuan
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> wrote:
>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on
>> various improvements, for example the RocksDB memory management for 1.10.
>> and keeps checking and voting for our releases, and also has successfully
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)
>>
>

-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Jingsong Li
Congratulations Yu, well deserved!

Best,
Jingsong

On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:

> Congrats, Yu!
>
> GXGX & well deserved!!
>
> Best Regards,
>
> Yuan
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> wrote:
>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on
>> various improvements, for example the RocksDB memory management for 1.10.
>> and keeps checking and voting for our releases, and also has successfully
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)
>>
>

-- 
Best, Jingsong Lee


Re: Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 Thread Jark Wu
是的。

On Wed, 17 Jun 2020 at 13:50, Zhou Zach  wrote:

> 那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-06-17 13:45:15, "Jark Wu"  wrote:
> >Hi,
> >
> >HBase connector 不用声明 update-mode 属性。 也不能声明。
> >
> >Best,
> >Jark
> >
> >On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:
> >
> >> The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: Could not find a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSinkFactory' in
> >> the classpath.
> >>
> >>
> >> Reason: No factory supports all properties.
> >>
> >>
> >> The matching candidates:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> Unsupported property keys:
> >> update-mode
> >>
> >>
> >> The following properties are requested:
> >> connector.table-name=user_hbase10
> >> connector.type=hbase
> >> connector.version=2.1.0
> >> connector.write.buffer-flush.interval=2s
> >> connector.write.buffer-flush.max-rows=1000
> >> connector.write.buffer-flush.max-size=10mb
> >> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> >> connector.zookeeper.znode.parent=/hbase
> >> schema.0.data-type=VARCHAR(2147483647)
> >> schema.0.name=rowkey
> >> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> >> `created_time` TIMESTAMP(3)
> >> schema.1.name=cf
> >> update-mode=upsert
> >>
> >>
> >> The following factories have been considered:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >> at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >> Could not find a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSinkFactory' in
> >> the classpath.
> >>
> >>
> >> Reason: No factory supports all properties.
> >>
> >>
> >> The matching candidates:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> Unsupported property keys:
> >> update-mode
> >>
> >>
> >> The following properties are requested:
> >> connector.table-name=user_hbase10
> >> connector.type=hbase
> >> connector.version=2.1.0
> >> connector.write.buffer-flush.interval=2s
> >> connector.write.buffer-flush.max-rows=1000
> >> connector.write.buffer-flush.max-size=10mb
> >> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> >> connector.zookeeper.znode.parent=/hbase
> >> schema.0.data-type=VARCHAR(2147483647)
> >> schema.0.name=rowkey
> >> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> >> `created_time` TIMESTAMP(3)
> >> schema.1.name=cf
> >> update-mode=upsert
> >>
> >>
> >> The following factories have been considered:
> >> org.apache.flink.addons.hbase.HBaseTableFactory
> >> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >> at
> >>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
> >> at
> >>
> 

Re:Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 Thread Zhou Zach
那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧

















At 2020-06-17 13:45:15, "Jark Wu"  wrote:
>Hi,
>
>HBase connector 不用声明 update-mode 属性。 也不能声明。
>
>Best,
>Jark
>
>On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:
>
>> The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSinkFactory' in
>> the classpath.
>>
>>
>> Reason: No factory supports all properties.
>>
>>
>> The matching candidates:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> Unsupported property keys:
>> update-mode
>>
>>
>> The following properties are requested:
>> connector.table-name=user_hbase10
>> connector.type=hbase
>> connector.version=2.1.0
>> connector.write.buffer-flush.interval=2s
>> connector.write.buffer-flush.max-rows=1000
>> connector.write.buffer-flush.max-size=10mb
>> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
>> connector.zookeeper.znode.parent=/hbase
>> schema.0.data-type=VARCHAR(2147483647)
>> schema.0.name=rowkey
>> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
>> `created_time` TIMESTAMP(3)
>> schema.1.name=cf
>> update-mode=upsert
>>
>>
>> The following factories have been considered:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSinkFactory' in
>> the classpath.
>>
>>
>> Reason: No factory supports all properties.
>>
>>
>> The matching candidates:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> Unsupported property keys:
>> update-mode
>>
>>
>> The following properties are requested:
>> connector.table-name=user_hbase10
>> connector.type=hbase
>> connector.version=2.1.0
>> connector.write.buffer-flush.interval=2s
>> connector.write.buffer-flush.max-rows=1000
>> connector.write.buffer-flush.max-size=10mb
>> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
>> connector.zookeeper.znode.parent=/hbase
>> schema.0.data-type=VARCHAR(2147483647)
>> schema.0.name=rowkey
>> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
>> `created_time` TIMESTAMP(3)
>> schema.1.name=cf
>> update-mode=upsert
>>
>>
>> The following factories have been considered:
>> org.apache.flink.addons.hbase.HBaseTableFactory
>> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> at
>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>> at
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>> at
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>> at
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>> at
>> 

?????? ??????FLINKSQL1.10????????????UV

2020-06-16 Thread x
??"??"UV??
sink??
tm uv
2020/06/17 13:46:00 1
2020/06/17 13:47:00 2
2020/06/17 13:48:00 3


group by ??


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com ??2020??6??17?? 11:14??

 
??0??UV??UV??
 CREATE VIEW uv_per_10min AS
 SELECTnbsp;
 nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd 
HH:mm:00'))nbsp;OVER w
 AS time_str,nbsp;
 nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 FROM user_behavior
 WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
 CURRENT ROW);


 ??
 PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 PS??1.10??DDL??CREATE VIEW??
 

Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 Thread Jark Wu
Hi,

HBase connector 不用声明 update-mode 属性。 也不能声明。

Best,
Jark

On Wed, 17 Jun 2020 at 13:08, Zhou Zach  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> the classpath.
>
>
> Reason: No factory supports all properties.
>
>
> The matching candidates:
> org.apache.flink.addons.hbase.HBaseTableFactory
> Unsupported property keys:
> update-mode
>
>
> The following properties are requested:
> connector.table-name=user_hbase10
> connector.type=hbase
> connector.version=2.1.0
> connector.write.buffer-flush.interval=2s
> connector.write.buffer-flush.max-rows=1000
> connector.write.buffer-flush.max-size=10mb
> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> connector.zookeeper.znode.parent=/hbase
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=rowkey
> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> `created_time` TIMESTAMP(3)
> schema.1.name=cf
> update-mode=upsert
>
>
> The following factories have been considered:
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> the classpath.
>
>
> Reason: No factory supports all properties.
>
>
> The matching candidates:
> org.apache.flink.addons.hbase.HBaseTableFactory
> Unsupported property keys:
> update-mode
>
>
> The following properties are requested:
> connector.table-name=user_hbase10
> connector.type=hbase
> connector.version=2.1.0
> connector.write.buffer-flush.interval=2s
> connector.write.buffer-flush.max-rows=1000
> connector.write.buffer-flush.max-size=10mb
> connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
> connector.zookeeper.znode.parent=/hbase
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=rowkey
> schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT,
> `created_time` TIMESTAMP(3)
> schema.1.name=cf
> update-mode=upsert
>
>
> The following factories have been considered:
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 

Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Yuan Mei
Congrats, Yu!

GXGX & well deserved!!

Best Regards,

Yuan

On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
wrote:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> part of the Apache Flink Project Management Committee (PMC).
>
> Yu Li has been very active on Flink's Statebackend component, working on
> various improvements, for example the RocksDB memory management for 1.10.
> and keeps checking and voting for our releases, and also has successfully
> produced two releases(1.10.0&1.10.1) as RM.
>
> Congratulations & Welcome Yu Li!
>
> Best,
> Jincheng (on behalf of the Flink PMC)
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Rui Li
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
> >
> >
> >
> >
> > 在2020年06月16日 18:38,Benchao Li 写道:
> > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
> >
> > Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
> >
> > > 我编译了1.11包
> > > 在sql-cli下查询hive的表报如下错误:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
> > >
> > >
> > > 查注册的kafka表报:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.ClassNotFoundException:
> > org.apache.flink.table.dataformat.BaseRow
> > >
> > >
> > > 依赖包是从1.10.1下面拷贝的
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > > Got it!
> > > Thx,junbao
> > >
> > >
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 09:32,zhangjunbao 写道:
> > > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > > https://issues.apache.org/jira/browse/FLINK-17189 <
> > > https://issues.apache.org/jira/browse/FLINK-17189>
> > >
> > > Best,
> > > Junbao Zhang
> > >
> > > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> > >
> > > hi,all
> > > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > > ddl如下:
> > > |
> > > CREATETABLE user_behavior (
> > > user_id BIGINT,
> > > item_id BIGINT,
> > > category_id BIGINT,
> > > behavior STRING,
> > > ts TIMESTAMP(3),
> > > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > > ) WITH (
> > > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > > 'connector.topic' = 'user_behavior',  -- kafka topic
> > > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> > zookeeper
> > > 地址
> > > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > > broker 地址
> > > 'format.type' = 'json'-- 数据源格式为 json
> > > );
> > > |
> > > 在查询时select * from user_behavior;报错如下:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.AssertionError: Conversion to relational algebra failed to
> > > preserve datatypes:
> > > validated type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > > converted type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> > NULL
> > > rel:
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[$5])
> > > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > > SECOND)])
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> > >
> > >
> > > flink版本:1.10.1
> > > blink planner,streaming model
> > >
> > >
> > > Thx
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > >
> >
>


-- 
Best regards!
Rui Li


Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Thanks Arvid,

now it makes sense.
Unfortunately, the problematic schema comes from a 3rd party we cannot
control, we have to ingest and do some work with it before being able to
map out of it.
But at least now the boundary of the problem is clear

Thanks to the whole community
Lorenzo


On Tue, 16 Jun 2020 at 22:54, Arvid Heise  wrote:

> Hi Lorenzo,
>
> I didn't mean to dismiss the issue, but it's not a matter of
> incompatibility, it's a matter of unsound generated code. It will break
> independently of Flink, since it apparently is a bug in the Avro compiler
> 1.8.2, so our options to fix it are limited.
> What we should do is to bump the Avro version to fix the underlying issue.
> You can vote for it on the respective ticket, which also explains why it's
> not that easy [1] (also you can vote on that respective hive ticket).
>
> I remember that I also encountered an issue with nullable logical types
> back in my user days, but didn't dare to fix it, since the Avro project was
> really inactive at that time (e.g., it looked dead). Possible workarounds:
> * Stick with non-logical types (what I ended up with). You need to convert
> manually in your first step, which sounds like a PITA, but that's what you
> would do on non-JVM projects anyways (logical types are not really
> established after 5+ years).
> * Use default values instead of union with null. So instead of using null
> to tag missing values, use 0 = 01.01.1970 to identify missing values.
>
> Deep copies are used whenever the same record has to be used multiple
> times (state, broadcast). That's why I thought your idea of switching to
> POJOs asap should help. Where do you see issues?
>
> [1] https://issues.apache.org/jira/browse/FLINK-12532
>
>
>
> On Tue, Jun 16, 2020 at 9:59 PM Lorenzo Nicora 
> wrote:
>
>> Hi Arvid,
>>
>> Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
>> way for dismissing a problem ;)
>> I am using the official AVRO 1.8.2 Maven plugin with no customisation  to
>> generate the code.
>> There might be some legit AVRO configurations that are incompatible with
>> Flink or something in the schema not fully supported.
>>
>> In particular, I noticed the user.avsc schema in Flink testing has *no
>> optional logical type fields* while my schema has multiple optional
>> timestamps.
>> Can AVRO-1891  (fixed
>> in AVRO 1.9.1) be related?
>>
>> I tried changing user.avsc making one of the timestamp fields a union
>> with null, and flink-avro tests start failing with a lot of "Unknown datum
>> type org.joda.time.DateTime"
>>
>> This would explain why using records generated with AVRO 1.9.2 and
>> dateTimeLogicalType=Joda and enableObjectReuse() behaves better.
>> The workaround only partially solves my problem.
>> It looks like deepCopies happen in many places not controlled by
>> enableObjectReuse, like when adding to some Collectors. Am I right?
>>
>> Cheers
>> Lorenzo
>>
>>
>> On Mon, 15 Jun 2020 at 19:30, Arvid Heise  wrote:
>>
>>> Hi Lorenzo,
>>>
>>> Thank you for confirming my suspicion. It really means something is
>>> broken in your Avro compiler setup and there is not much that we can do on
>>> our end.
>>>
>>> Just for reference, we are having a user.avsc [1] being compiled [2]
>>> with 1.8.2 into this snippet [3] for our
>>> tests. Look especially on how the conversions look like; they have a
>>> different template style than yours.
>>>
>>> The expectation is that you have 1 conversion for each logical type that
>>> is compiled to joda type. If you have conversions on other places, you can
>>> trace back to which field they belong by using the IndexedRecord methods.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml
>>> [3] https://gist.github.com/AHeise/041ff5ccf76087975ed157c3d3276875
>>>
>>> On Thu, Jun 11, 2020 at 3:22 PM Lorenzo Nicora 
>>> wrote:
>>>

 Hi Arvit,

 I followed your instructions for the breakpoint in
 SpecificDatumReader.readField *with  AVRO 1.8.2*,

 For all timestamp-millis fields (I have many):

 Conversion conversion = ((SpecificRecordBase)
 r).getConversion(f.pos());


 returns null for all timestamp-millis fields (I have many), so...

 datum = readWithoutConversion(oldDatum, f.schema(), in);


 is used instead and returns a *Long*



 Not sure it's relevant, but in this version I have the explicit
 dependency org.apache.avro:avro:1.8.2 and I am using the
 avro-maven-plugin (1.8.2) to generate the record from .avsc with this
 configuration:

 
 String
 true
 private
 true
 


 Cheers
 Lorenzo


 On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:

> Sorry forget my last mail, that was 

Re: Blink Planner Retracting Streams

2020-06-16 Thread godfrey he
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the
StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
  override def map(value: (Boolean, Row)): R = ...
})

Bests,
Godfrey

John Mathews  于2020年6月17日周三 下午12:13写道:

> Hello,
>
> I am working on migrating from the flink table-planner to the new blink
> one, and one problem I am running into is that it doesn't seem like Blink
> has a concept of a CRow, unlike the original table-planner.
>
> I am therefore struggling to figure out how to properly convert a
> retracting stream to a SingleOutputStreamOperator when using just the Blink
> planner libraries.
>
> E.g. in the old planner I could do something like this:
> SingleOutputStreamOperator stream =
> tableEnvironment.toRetractStream(table, typeInfo)
> .map(value -> new CRow(value.f1, value.f0);
>
> but without the CRow, I'm not sure how to accomplish this.
>
> Any suggestions?
>
> Thanks!
> John
>
>
>


flink sql DDL Unsupported update-mode hbase

2020-06-16 Thread Zhou Zach
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


Reason: No factory supports all properties.


The matching candidates:
org.apache.flink.addons.hbase.HBaseTableFactory
Unsupported property keys:
update-mode


The following properties are requested:
connector.table-name=user_hbase10
connector.type=hbase
connector.version=2.1.0
connector.write.buffer-flush.interval=2s
connector.write.buffer-flush.max-rows=1000
connector.write.buffer-flush.max-size=10mb
connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
connector.zookeeper.znode.parent=/hbase
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=rowkey
schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT, `created_time` 
TIMESTAMP(3)
schema.1.name=cf
update-mode=upsert


The following factories have been considered:
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


Reason: No factory supports all properties.


The matching candidates:
org.apache.flink.addons.hbase.HBaseTableFactory
Unsupported property keys:
update-mode


The following properties are requested:
connector.table-name=user_hbase10
connector.type=hbase
connector.version=2.1.0
connector.write.buffer-flush.interval=2s
connector.write.buffer-flush.max-rows=1000
connector.write.buffer-flush.max-size=10mb
connector.zookeeper.quorum=cdh1:2181,cdh2:2181,cdh3:2181
connector.zookeeper.znode.parent=/hbase
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=rowkey
schema.1.data-type=ROW<`sex` VARCHAR(2147483647), `age` INT, `created_time` 
TIMESTAMP(3)
schema.1.name=cf
update-mode=upsert


The following factories have been considered:
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:310)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  

?????? flink sql ????????ROW??????????INT

2020-06-16 Thread Zhou Zach
??hbase??hbase




----
??:"Leonard Xu"

Re: Any python example with json data from Kafka using flink-statefun

2020-06-16 Thread Sunil
Thanks Gordon.
Really appreciate your detailed response and this definitely helps.


On 2020/06/17 04:45:11, "Tzu-Li (Gordon) Tai"  wrote: 
> (forwarding this to user@ as it is more suited to be located there)
> 
> Hi Sunil,
> 
> With remote functions (using the Python SDK), messages sent to / from them
> must be Protobuf messages.
> This is a requirement since remote functions can be written in any
> language, and we use Protobuf as a means for cross-language messaging.
> If you are defining Kafka ingresses in a remote module (via textual YAML
> module configs), then records in the Kafka ingress will be directly routed
> to the remote functions, and therefore they are required to be Protobuf
> messages as well.
> 
> With embedded functions (using the current Java SDK), then what you are
> trying to do is possible.
> When using the Java SDK, the Kafka ingress allows providing a
> `KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
> into any type you intend for messaging within the StateFun application. So
> there, you can convert your JSON records.
> 
> If you want to still write your main application logic in Python, but the
> input and output messages in Kafka are required to be JSON,
> what you can currently do is have a mix of remote module [2] containing the
> application logic as Python functions,
> and a separate embedded module [3] containing the Java Kafka ingress and
> egresses.
> So, concretely, your 2 modules will contain:
> 
> Remote module:
> - Your Python functions implementing the main business logic.
> 
> Embedded module:
> - Java Kafka ingress with deserializer that converts JSON to Protobuf
> messages. Here you have the freedom to extract only the fields that you
> need.
> - A Java router [4] that routes those converted messages to the remote
> functions, by their logical address
> - A Java Kafka egress with serializer that converts Protobuf messages from
> remote functions into JSON Kafka records.
> - A Java function that simply forwards input messages to the Kafka Kafka
> egress. If the remote functions need to write JSON messages to Kafka, they
> send a Protobuf message to this function.
> 
> 
> Hope this helps.
> Note that the egress side of things can definitely be easier (without the
> extra forwarding through a Java function) if the Python SDK's
> `kafka_egress_record` method allows supplying arbitrary bytes.
> Then you would be able to already write to Kafka JSON messages in the
> Python functions.
> This however isn't supported yet, but technically it is quite easy to
> achieve. I've just filed a issue for this [5], in case you'd like to follow
> that.
> 
> Cheers,
> Gordon
> 
> [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
> [2]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module
> 
> [3]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
> [4]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
> [5] https://issues.apache.org/jira/browse/FLINK-18340
> 
> On Wed, Jun 17, 2020 at 9:25 AM Sunil  wrote:
> 
> > checking to see if this is possible currently.
> > Read json data from kafka topic => process using statefun => write out to
> > kafka in json format.
> >
> > I could have a separate process to read the source json data convert to
> > protobuf into another kafka topic but it sounds in-efficient.
> > e.g.
> > Read json data from kafka topic =>convert json to protobuf =>  process
> > using statefun => write out to kafka in protobuf format.=> convert protobuf
> > to json message
> >
> > Appreciate any advice on how to process json messages using statefun ,
> > also if this is not possible in the current python sdk, can i do that using
> > the java/scala sdk?
> >
> > Thanks.
> >
> > On 2020/06/15 15:34:39, Sunil Sattiraju  wrote:
> > > Thanks Igal,
> > > I dont have control over the data source inside kafka ( current kafka
> > topic contains either json or avro formats only, i am trying to reproduce
> > this scenario using my test data generator ).
> > >
> > > is it possible to convert the json to proto at the receiving end of
> > statefun applicaiton?
> > >
> > > On 2020/06/15 14:51:01, Igal Shilman  wrote:
> > > > Hi,
> > > >
> > > > The values must be valid encoded Protobuf messages [1], while in your
> > > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > > You can take a look at this example with a generator that produces
> > Protobuf
> > > > messages [2][3]
> > > >
> > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > > [2]
> > > >
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > > [3]
> > > >
> > 

Re: Any python example with json data from Kafka using flink-statefun

2020-06-16 Thread Tzu-Li (Gordon) Tai
(forwarding this to user@ as it is more suited to be located there)

Hi Sunil,

With remote functions (using the Python SDK), messages sent to / from them
must be Protobuf messages.
This is a requirement since remote functions can be written in any
language, and we use Protobuf as a means for cross-language messaging.
If you are defining Kafka ingresses in a remote module (via textual YAML
module configs), then records in the Kafka ingress will be directly routed
to the remote functions, and therefore they are required to be Protobuf
messages as well.

With embedded functions (using the current Java SDK), then what you are
trying to do is possible.
When using the Java SDK, the Kafka ingress allows providing a
`KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
into any type you intend for messaging within the StateFun application. So
there, you can convert your JSON records.

If you want to still write your main application logic in Python, but the
input and output messages in Kafka are required to be JSON,
what you can currently do is have a mix of remote module [2] containing the
application logic as Python functions,
and a separate embedded module [3] containing the Java Kafka ingress and
egresses.
So, concretely, your 2 modules will contain:

Remote module:
- Your Python functions implementing the main business logic.

Embedded module:
- Java Kafka ingress with deserializer that converts JSON to Protobuf
messages. Here you have the freedom to extract only the fields that you
need.
- A Java router [4] that routes those converted messages to the remote
functions, by their logical address
- A Java Kafka egress with serializer that converts Protobuf messages from
remote functions into JSON Kafka records.
- A Java function that simply forwards input messages to the Kafka Kafka
egress. If the remote functions need to write JSON messages to Kafka, they
send a Protobuf message to this function.


Hope this helps.
Note that the egress side of things can definitely be easier (without the
extra forwarding through a Java function) if the Python SDK's
`kafka_egress_record` method allows supplying arbitrary bytes.
Then you would be able to already write to Kafka JSON messages in the
Python functions.
This however isn't supported yet, but technically it is quite easy to
achieve. I've just filed a issue for this [5], in case you'd like to follow
that.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module

[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
[4]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
[5] https://issues.apache.org/jira/browse/FLINK-18340

On Wed, Jun 17, 2020 at 9:25 AM Sunil  wrote:

> checking to see if this is possible currently.
> Read json data from kafka topic => process using statefun => write out to
> kafka in json format.
>
> I could have a separate process to read the source json data convert to
> protobuf into another kafka topic but it sounds in-efficient.
> e.g.
> Read json data from kafka topic =>convert json to protobuf =>  process
> using statefun => write out to kafka in protobuf format.=> convert protobuf
> to json message
>
> Appreciate any advice on how to process json messages using statefun ,
> also if this is not possible in the current python sdk, can i do that using
> the java/scala sdk?
>
> Thanks.
>
> On 2020/06/15 15:34:39, Sunil Sattiraju  wrote:
> > Thanks Igal,
> > I dont have control over the data source inside kafka ( current kafka
> topic contains either json or avro formats only, i am trying to reproduce
> this scenario using my test data generator ).
> >
> > is it possible to convert the json to proto at the receiving end of
> statefun applicaiton?
> >
> > On 2020/06/15 14:51:01, Igal Shilman  wrote:
> > > Hi,
> > >
> > > The values must be valid encoded Protobuf messages [1], while in your
> > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > You can take a look at this example with a generator that produces
> Protobuf
> > > messages [2][3]
> > >
> > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > [2]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > [3]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > >
> > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> sunilsattir...@gmail.com>
> > > wrote:
> > >
> > > > Hi, Based on the example from
> > > >
> 

Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread M Singh
 
Thanks Roman for your response and advice.
>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 
Please let me know if I have missed anything.
MansOn Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4708954190 p.yiv4708954190MsoNormal, #yiv4708954190 
p.yiv4708954190MsoNoSpacing{margin:0;}Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks



  

回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>


Blink Planner Retracting Streams

2020-06-16 Thread John Mathews
Hello,

I am working on migrating from the flink table-planner to the new blink
one, and one problem I am running into is that it doesn't seem like Blink
has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a
retracting stream to a SingleOutputStreamOperator when using just the Blink
planner libraries.

E.g. in the old planner I could do something like this:
SingleOutputStreamOperator stream =
tableEnvironment.toRetractStream(table, typeInfo)
.map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 Thread Jark Wu
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。
另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。



On Wed, 17 Jun 2020 at 11:46, Benchao Li  wrote:

> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:
>
> > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > CREATE VIEW uv_per_10min AS
> > SELECT
> >  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER
> w
> > AS time_str,
> >  COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW);
> >
> >
> > 想请教一下,应该如何处理?
> > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > 多谢
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 Thread Benchao Li
Hi,
我感觉这种场景可以有两种方式,
1. 可以直接用group by + mini batch
2. window聚合 + fast emit

对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。

对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 60 s

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:

> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> CREATE VIEW uv_per_10min AS
> SELECT
>  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w
> AS time_str,
>  COUNT(DISTINCT user_id) OVER w AS uv
> FROM user_behavior
> WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW);
>
>
> 想请教一下,应该如何处理?
> PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> 多谢


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



??????FLINKSQL1.10????????????UV

2020-06-16 Thread x
??0??UV??UV??
CREATE VIEW uv_per_10min AS
SELECT
 MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w AS 
time_str,
 COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW);


??
PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ??
PS??1.10??DDL??CREATE VIEW??


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Dian Fu
Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Dian Fu
Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Fwd: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

2020-06-16 Thread 杜斌
-- Forwarded message -
发件人: 杜斌 
Date: 2020年6月17日周三 上午10:31
Subject: Re: Flink Table program cannot be compiled when enable checkpoint
of StreamExecutionEnvironment
To: 


add the full stack trace here:


Caused by:
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 14 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table
program cannot be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 17 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column
46: Cannot determine simple type name "org"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 23 more

杜斌  于2020年6月17日周三 上午10:29写道:

> Hi,
> Need help on this issue, here is what Flink reported when I enable the
> checkpoint setting of the StreamExecutionEnvironment:
>
> /* 1 */
> /* 2 */  public class SourceConversion$1 extends
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> 

Re: flink sql 中怎么把ROW类型转换成INT

2020-06-16 Thread Leonard Xu

Hi,
> 在 2020年6月17日,09:33,Zhou Zach  写道:
> 
> 怎么把int转换成Integer呢或者把Integer转换成int

我理解 Integer 和 int 是相同的类型,INT 是SQL的类型,Integer 是java中的具体实现类,这个错误看起来是在读 
hbase中的数据时,数据的index不对,
能把 sql 和异常栈贴下吗?

Best,
Leonard Xu

Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Benchao Li
Congratulations Yu!

Jark Wu  于2020年6月17日周三 上午10:36写道:

> Congratulations Yu! Well deserved!
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
>
> > Congratulations Yu!
> >
> > Best,
> > Haibo
> >
> >
> > At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> > >Hi all,
> > >
> > >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> > >part of the Apache Flink Project Management Committee (PMC).
> > >
> > >Yu Li has been very active on Flink's Statebackend component, working on
> > >various improvements, for example the RocksDB memory management for
> 1.10.
> > >and keeps checking and voting for our releases, and also has
> successfully
> > >produced two releases(1.10.0&1.10.1) as RM.
> > >
> > >Congratulations & Welcome Yu Li!
> > >
> > >Best,
> > >Jincheng (on behalf of the Flink PMC)
> >
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Benchao Li
Congratulations Yu!

Jark Wu  于2020年6月17日周三 上午10:36写道:

> Congratulations Yu! Well deserved!
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
>
> > Congratulations Yu!
> >
> > Best,
> > Haibo
> >
> >
> > At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> > >Hi all,
> > >
> > >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> > >part of the Apache Flink Project Management Committee (PMC).
> > >
> > >Yu Li has been very active on Flink's Statebackend component, working on
> > >various improvements, for example the RocksDB memory management for
> 1.10.
> > >and keeps checking and voting for our releases, and also has
> successfully
> > >produced two releases(1.10.0&1.10.1) as RM.
> > >
> > >Congratulations & Welcome Yu Li!
> > >
> > >Best,
> > >Jincheng (on behalf of the Flink PMC)
> >
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Jark Wu
Congratulations Yu! Well deserved!

Best,
Jark

On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:

> Congratulations Yu!
>
> Best,
> Haibo
>
>
> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> >Hi all,
> >
> >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >part of the Apache Flink Project Management Committee (PMC).
> >
> >Yu Li has been very active on Flink's Statebackend component, working on
> >various improvements, for example the RocksDB memory management for 1.10.
> >and keeps checking and voting for our releases, and also has successfully
> >produced two releases(1.10.0&1.10.1) as RM.
> >
> >Congratulations & Welcome Yu Li!
> >
> >Best,
> >Jincheng (on behalf of the Flink PMC)
>
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Jark Wu
Congratulations Yu! Well deserved!

Best,
Jark

On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:

> Congratulations Yu!
>
> Best,
> Haibo
>
>
> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> >Hi all,
> >
> >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >part of the Apache Flink Project Management Committee (PMC).
> >
> >Yu Li has been very active on Flink's Statebackend component, working on
> >various improvements, for example the RocksDB memory management for 1.10.
> >and keeps checking and voting for our releases, and also has successfully
> >produced two releases(1.10.0&1.10.1) as RM.
> >
> >Congratulations & Welcome Yu Li!
> >
> >Best,
> >Jincheng (on behalf of the Flink PMC)
>
>


Re: 通过Kafka更新规则

2020-06-16 Thread Ruibin Xing
如果有逻辑上的变更,会导致Checkpoint不可用?之前没有从checkpoint恢复状态的经验,没考虑过可以从checkpoint中恢复,我看看相关资料,感谢!

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:57写道:

> 为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧
>
>
>
>
> 在2020年06月16日 11:57,Ruibin Xing 写道:
> 我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
> Flink。
>
> RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。
>
> 目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:
>
> 1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
>   目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。
> 2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。
> 3.使用方案1,但是启动时Flink从RDB拉取全量规则。
> 4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。
>
>   各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Benchao Li
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

2020-06-16 Thread Marco Villalobos
I need to compute averages on time series data upon a 15 minute tumbling event 
time window that is backfilled.

The time series data is a Tuple3 of name: String, value: double, event_time: 
Timestamp (Instant).

I need to compute the average value of the name time series on a tumbling 
window of 15 minutes  with backfills such that

given the input:

a,10,2020-06-23T00:01:30.000Z
a,15,2020-06-23T00:02:30.000Z
a,20,2020-06-23T00:03:30.000Z
b,25,2020-06-23T00:03:30.000Z
b,30,2020-06-23T00:02:30.000Z
b,35,2020-06-23T00:01:30.000Z
b,35,2020-06-23T00:16:30.000Z

it yields the following averages with backfill:

a,15,2020-06-23 00:00:00.0
b,30,2020-06-23 00:00:00.0
a,15,2020-06-23 00:15:00.0
b,35,2020-06-23 00:15:00.0

Notice that although no value arrived "a" in the second quarter, the previous 
average was upserted.

I only got as far as computing the average, but I have not figured a 
recommended strategy for upserting the backfill.

I made a GitHub project to share my approach:

https://github.com/minmay/flink-patterns

And the following code demonstrates my approach thus far.

Can somebody please provide me guidance on what is the "Flink" recommended way 
of assigning a backfill to an average on keyed windowed stream?

package mvillalobos.flink.patterns.timeseries.average;

import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;

public class TimeSeriesAverageApp {

private final static Logger logger = 
LoggerFactory.getLogger(TimeSeriesAverageApp.class);

public void stream(String inputFilePath) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// GIVEN a SOURCE with a CSV input file
// in which each line has a: String, double, Instant
// THEN the MAP operator
// transforms the line into a Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfile: boolean
// WHEN the map operation finishes
// THEN the event time assigned using field f3
final DataStream> timeSeriesStream = env.readTextFile(inputFilePath)
.map(line -> {
final String[] split = line.split(",");
final String name = split[0];
final double value = Double.parseDouble(split[1]);
final Instant timestamp = Instant.parse(split[2]);
return Tuple7.of(name, 1, value, timestamp, value, 1, 
false);
}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, 
TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN))
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<>() {
@Override
public long 
extractAscendingTimestamp(Tuple7 element) {
return element.f3.toEpochMilli();
}
}
);

final JDBCUpsertTableSink jdbcUpsertTableSink = 
buildJdbcUpsertTableSink();

upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);

// GIVEN a data stream with Tuple7
// f0: name: String
// f1: window_size: int
// 

Re: How do I backfill time series data?

2020-06-16 Thread Marco Villalobos
Hi Robert, 

I believe that I cannot use a "ProcessFunction" because I key the stream, and I 
use TumblingEventTimeWindows, which does not allow for the use of 
"ProcessFunction" in that scenario.

I compute the averages with a ProcessWindowFunction.

I am going to follow up this question in a new thread with more information.

Thank you.

Sincerely,

Marco Villalobos



> On Jun 15, 2020, at 11:13 AM, Robert Metzger  wrote:
> 
> Hi Marco,
> 
> I'm not 100% if I understood the problem. Let me repeat: You want a stream of 
> 15 minute averages for each unique "name". If there's no data available for a 
> 15m average, use the data from the previous 15m time window?
> 
> If that's the problem, you can probably build this using ProcessFunction and 
> a timer. For each key, you are just storing the average in Flink state. You 
> set a timer which outputs the last stored average and sets a new timer. 
> 
> Hope that is some useful inspiration!
> 
> Best,
> Robert
> 
> On Mon, Jun 15, 2020 at 4:59 AM Marco Villalobos  > wrote:
> Hello Flink community. I need help. Thus far, Flink has proven very useful to 
> me.
> 
> I am using it for stream processing of time-series data.
> 
> For the scope of this mailing list, let's say the time-series has the fields: 
> name: String, value: double, and timestamp: Instant.
> 
> I named the time series: timeSeriesDataStream.
> 
> My first task was to average the time series by name within a 15 minute 
> tumbling event time window.
> 
> \
> I was able to solve this with a ProcessWindowFunction (had to use this 
> approach because the watermark is not keyed), and named resultant  stream: 
> aggregateTimeSeriesDataStream, and then "sinking" the values. 
> 
> My next task is to backfill the name averages on the subsequent. This means 
> that if a time-series does not appear in a subsequent window then the 
> previous average value will be used in that window.
> 
> How do I do this?
> 
> I started by performing a Map function on the aggregateTimeSeriesDataStream 
> to change the timestamp back 15 minutes, and naming the resultant stream:
> backfilledDataStream.
> 
> Now, I am stuck. I suspect that I either
> 
> 1) timeSeriesDataStream.coGroup(backfilledDataStream) and add 
> CoGroupWindowFunction to process the backfill.
> 2) Use "iterate" to somehow jury rig a backfill.
> 
> I really don't know.  That's why I am asking this group for advice.
> 
> What's the common solution for this problem? I am quite sure that this is a 
> very common use-case.



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Leonard Xu
Congratulations Yu !

Best,
Leonard Xu

> 在 2020年6月17日,09:50,Yangze Guo  写道:
> 
> Congrats, Yu!
> Best,
> Yangze Guo
> 
> On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>> 
>> Congratulations Yu, well deserved~!
>> 
>> Thank you~
>> 
>> Xintong Song
>> 
>> 
>> 
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on 
>>> various improvements, for example the RocksDB memory management for 1.10. 
>>> and keeps checking and voting for our releases, and also has successfully 
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Leonard Xu
Congratulations Yu !

Best,
Leonard Xu

> 在 2020年6月17日,09:50,Yangze Guo  写道:
> 
> Congrats, Yu!
> Best,
> Yangze Guo
> 
> On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>> 
>> Congratulations Yu, well deserved~!
>> 
>> Thank you~
>> 
>> Xintong Song
>> 
>> 
>> 
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on 
>>> various improvements, for example the RocksDB memory management for 1.10. 
>>> and keeps checking and voting for our releases, and also has successfully 
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)



Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Yangze Guo
Congrats, Yu!
Best,
Yangze Guo

On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>
> Congratulations Yu, well deserved~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on 
>> various improvements, for example the RocksDB memory management for 1.10. 
>> and keeps checking and voting for our releases, and also has successfully 
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Yangze Guo
Congrats, Yu!
Best,
Yangze Guo

On Wed, Jun 17, 2020 at 9:35 AM Xintong Song  wrote:
>
> Congratulations Yu, well deserved~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Yu Li has been very active on Flink's Statebackend component, working on 
>> various improvements, for example the RocksDB memory management for 1.10. 
>> and keeps checking and voting for our releases, and also has successfully 
>> produced two releases(1.10.0&1.10.1) as RM.
>>
>> Congratulations & Welcome Yu Li!
>>
>> Best,
>> Jincheng (on behalf of the Flink PMC)


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Xintong Song
Congratulations Yu, well deserved~!

Thank you~

Xintong Song



On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
wrote:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> part of the Apache Flink Project Management Committee (PMC).
>
> Yu Li has been very active on Flink's Statebackend component, working on
> various improvements, for example the RocksDB memory management for 1.10.
> and keeps checking and voting for our releases, and also has successfully
> produced two releases(1.10.0&1.10.1) as RM.
>
> Congratulations & Welcome Yu Li!
>
> Best,
> Jincheng (on behalf of the Flink PMC)
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Xintong Song
Congratulations Yu, well deserved~!

Thank you~

Xintong Song



On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
wrote:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> part of the Apache Flink Project Management Committee (PMC).
>
> Yu Li has been very active on Flink's Statebackend component, working on
> various improvements, for example the RocksDB memory management for 1.10.
> and keeps checking and voting for our releases, and also has successfully
> produced two releases(1.10.0&1.10.1) as RM.
>
> Congratulations & Welcome Yu Li!
>
> Best,
> Jincheng (on behalf of the Flink PMC)
>


?????? flink sql ????????ROW??????????INT

2020-06-16 Thread Zhou Zach
??.??
offset (0) + length (4) exceed the capacity of the array: 2
 ?? hbaseint??
??users.addColumn("cf", "age", classOf[Integer]) ??
??int??IntegerInteger??int





----
??:"Leonard Xu"

[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
part of the Apache Flink Project Management Committee (PMC).

Yu Li has been very active on Flink's Statebackend component, working on
various improvements, for example the RocksDB memory management for 1.10.
and keeps checking and voting for our releases, and also has successfully
produced two releases(1.10.0&1.10.1) as RM.

Congratulations & Welcome Yu Li!

Best,
Jincheng (on behalf of the Flink PMC)


[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
part of the Apache Flink Project Management Committee (PMC).

Yu Li has been very active on Flink's Statebackend component, working on
various improvements, for example the RocksDB memory management for 1.10.
and keeps checking and voting for our releases, and also has successfully
produced two releases(1.10.0&1.10.1) as RM.

Congratulations & Welcome Yu Li!

Best,
Jincheng (on behalf of the Flink PMC)


Re: Reading from AVRO files

2020-06-16 Thread Arvid Heise
Hi Lorenzo,

I didn't mean to dismiss the issue, but it's not a matter of
incompatibility, it's a matter of unsound generated code. It will break
independently of Flink, since it apparently is a bug in the Avro compiler
1.8.2, so our options to fix it are limited.
What we should do is to bump the Avro version to fix the underlying issue.
You can vote for it on the respective ticket, which also explains why it's
not that easy [1] (also you can vote on that respective hive ticket).

I remember that I also encountered an issue with nullable logical types
back in my user days, but didn't dare to fix it, since the Avro project was
really inactive at that time (e.g., it looked dead). Possible workarounds:
* Stick with non-logical types (what I ended up with). You need to convert
manually in your first step, which sounds like a PITA, but that's what you
would do on non-JVM projects anyways (logical types are not really
established after 5+ years).
* Use default values instead of union with null. So instead of using null
to tag missing values, use 0 = 01.01.1970 to identify missing values.

Deep copies are used whenever the same record has to be used multiple times
(state, broadcast). That's why I thought your idea of switching to POJOs
asap should help. Where do you see issues?

[1] https://issues.apache.org/jira/browse/FLINK-12532



On Tue, Jun 16, 2020 at 9:59 PM Lorenzo Nicora 
wrote:

> Hi Arvid,
>
> Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
> way for dismissing a problem ;)
> I am using the official AVRO 1.8.2 Maven plugin with no customisation  to
> generate the code.
> There might be some legit AVRO configurations that are incompatible with
> Flink or something in the schema not fully supported.
>
> In particular, I noticed the user.avsc schema in Flink testing has *no
> optional logical type fields* while my schema has multiple optional
> timestamps.
> Can AVRO-1891  (fixed in
> AVRO 1.9.1) be related?
>
> I tried changing user.avsc making one of the timestamp fields a union with
> null, and flink-avro tests start failing with a lot of "Unknown datum type
> org.joda.time.DateTime"
>
> This would explain why using records generated with AVRO 1.9.2 and
> dateTimeLogicalType=Joda and enableObjectReuse() behaves better.
> The workaround only partially solves my problem.
> It looks like deepCopies happen in many places not controlled by
> enableObjectReuse, like when adding to some Collectors. Am I right?
>
> Cheers
> Lorenzo
>
>
> On Mon, 15 Jun 2020 at 19:30, Arvid Heise  wrote:
>
>> Hi Lorenzo,
>>
>> Thank you for confirming my suspicion. It really means something is
>> broken in your Avro compiler setup and there is not much that we can do on
>> our end.
>>
>> Just for reference, we are having a user.avsc [1] being compiled [2] with
>> 1.8.2 into this snippet [3] for our tests.
>> Look especially on how the conversions look like; they have a different
>> template style than yours.
>>
>> The expectation is that you have 1 conversion for each logical type that
>> is compiled to joda type. If you have conversions on other places, you can
>> trace back to which field they belong by using the IndexedRecord methods.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc
>> [2]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml
>> [3] https://gist.github.com/AHeise/041ff5ccf76087975ed157c3d3276875
>>
>> On Thu, Jun 11, 2020 at 3:22 PM Lorenzo Nicora 
>> wrote:
>>
>>>
>>> Hi Arvit,
>>>
>>> I followed your instructions for the breakpoint in
>>> SpecificDatumReader.readField *with  AVRO 1.8.2*,
>>>
>>> For all timestamp-millis fields (I have many):
>>>
>>> Conversion conversion = ((SpecificRecordBase)
>>> r).getConversion(f.pos());
>>>
>>>
>>> returns null for all timestamp-millis fields (I have many), so...
>>>
>>> datum = readWithoutConversion(oldDatum, f.schema(), in);
>>>
>>>
>>> is used instead and returns a *Long*
>>>
>>>
>>>
>>> Not sure it's relevant, but in this version I have the explicit
>>> dependency org.apache.avro:avro:1.8.2 and I am using the
>>> avro-maven-plugin (1.8.2) to generate the record from .avsc with this
>>> configuration:
>>>
>>> 
>>> String
>>> true
>>> private
>>> true
>>> 
>>>
>>>
>>> Cheers
>>> Lorenzo
>>>
>>>
>>> On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:
>>>
 Sorry forget my last mail, that was half-finished.

 Here is the real one:

 Hi Lorenzo,

 if you still have time to investigate.

 Your stack trace shows that all expected code paths have been taken.
 Conversions are there; although they look different than here, but that can
 be attributed to the avro upgrade.

 Could you put a breakpoint on SpecificDatumReader.readField, so that
 you can inspect the conversion for the timestamp field? You probably want
 to make it a 

Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Hi Arvid,

Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
way for dismissing a problem ;)
I am using the official AVRO 1.8.2 Maven plugin with no customisation  to
generate the code.
There might be some legit AVRO configurations that are incompatible with
Flink or something in the schema not fully supported.

In particular, I noticed the user.avsc schema in Flink testing has *no
optional logical type fields* while my schema has multiple optional
timestamps.
Can AVRO-1891  (fixed in
AVRO 1.9.1) be related?

I tried changing user.avsc making one of the timestamp fields a union with
null, and flink-avro tests start failing with a lot of "Unknown datum type
org.joda.time.DateTime"

This would explain why using records generated with AVRO 1.9.2 and
dateTimeLogicalType=Joda and enableObjectReuse() behaves better.
The workaround only partially solves my problem.
It looks like deepCopies happen in many places not controlled by
enableObjectReuse, like when adding to some Collectors. Am I right?

Cheers
Lorenzo


On Mon, 15 Jun 2020 at 19:30, Arvid Heise  wrote:

> Hi Lorenzo,
>
> Thank you for confirming my suspicion. It really means something is broken
> in your Avro compiler setup and there is not much that we can do on our end.
>
> Just for reference, we are having a user.avsc [1] being compiled [2] with
> 1.8.2 into this snippet [3] for our tests.
> Look especially on how the conversions look like; they have a different
> template style than yours.
>
> The expectation is that you have 1 conversion for each logical type that
> is compiled to joda type. If you have conversions on other places, you can
> trace back to which field they belong by using the IndexedRecord methods.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/avro/user.avsc
> [2]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/pom.xml
> [3] https://gist.github.com/AHeise/041ff5ccf76087975ed157c3d3276875
>
> On Thu, Jun 11, 2020 at 3:22 PM Lorenzo Nicora 
> wrote:
>
>>
>> Hi Arvit,
>>
>> I followed your instructions for the breakpoint in
>> SpecificDatumReader.readField *with  AVRO 1.8.2*,
>>
>> For all timestamp-millis fields (I have many):
>>
>> Conversion conversion = ((SpecificRecordBase)
>> r).getConversion(f.pos());
>>
>>
>> returns null for all timestamp-millis fields (I have many), so...
>>
>> datum = readWithoutConversion(oldDatum, f.schema(), in);
>>
>>
>> is used instead and returns a *Long*
>>
>>
>>
>> Not sure it's relevant, but in this version I have the explicit
>> dependency org.apache.avro:avro:1.8.2 and I am using the
>> avro-maven-plugin (1.8.2) to generate the record from .avsc with this
>> configuration:
>>
>> 
>> String
>> true
>> private
>> true
>> 
>>
>>
>> Cheers
>> Lorenzo
>>
>>
>> On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:
>>
>>> Sorry forget my last mail, that was half-finished.
>>>
>>> Here is the real one:
>>>
>>> Hi Lorenzo,
>>>
>>> if you still have time to investigate.
>>>
>>> Your stack trace shows that all expected code paths have been taken.
>>> Conversions are there; although they look different than here, but that can
>>> be attributed to the avro upgrade.
>>>
>>> Could you put a breakpoint on SpecificDatumReader.readField, so that
>>> you can inspect the conversion for the timestamp field? You probably want
>>> to make it a conditional for f.name() == .
>>> The expected flow is that it should have a conversion that returns the
>>> joda time instead of the long. Then datum should be the converted joda time.
>>>
>>> @Override
>>> protected void readField(Object r, Schema.Field f, Object oldDatum,
>>>  ResolvingDecoder in, Object state)
>>> throws IOException {
>>>   if (r instanceof SpecificRecordBase) {
>>> Conversion conversion = ((SpecificRecordBase) 
>>> r).getConversion(f.pos());
>>>
>>> Object datum;
>>> if (conversion != null) {
>>>   datum = readWithConversion(
>>>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, 
>>> in);
>>> } else {
>>>   datum = readWithoutConversion(oldDatum, f.schema(), in);
>>> }
>>>
>>> getData().setField(r, f.name(), f.pos(), datum);
>>>
>>>   } else {
>>> super.readField(r, f, oldDatum, in, state);
>>>   }
>>> }
>>>
>>>
>>> On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise  wrote:
>>>
 Hi Lorenzo,

 if you still have time to investigate.

 Your stack trace shows that all expected code paths have been taken.
 Conversions are there although they look different than here, but that can
 be attributed to the avro upgrade.

 @Override
 protected void readField(Object r, Schema.Field f, Object oldDatum,
  ResolvingDecoder in, Object state)
 throws IOException {
   if (r instanceof SpecificRecordBase) {
 Conversion conversion = ((SpecificRecordBase) 

Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread Roman Grebennikov
Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") // 
we poll every 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // in 
case of throughput error, initial timeout is 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // we 
can go up to 10s pause
 
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and make 
up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
> Hi:
> 
> I am using multiple (almost 30 and growing) Flink streaming applications that 
> read from the same kinesis stream and get 
> ProvisionedThroughputExceededException exception which fails the job.
> I have seen a reference 
> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
> 
> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
> ProvisionedThroughputExceededException - ASF JIRA 
>  is still open.
> 
> 
> So i wanted to find out 
> 
> 1. If this issue has been resolved and if so in which version ?
> 2. Is there any kinesis consumer with kinesis fanout available that can help 
> address this issue ?
> 3. Is there any specific parameter in kinesis consumer config that can 
> address this issue ?
> 
> If there is any other pointer/documentation/reference, please let me know.
> 
> Thanks
> 


Re: [EXTERNAL] Flink Count of Events using metric

2020-06-16 Thread Slotterback, Chris
As the answer on SO suggests, Prometheus comes with lots of functionality to do 
what you’re requesting using just a simple count metric:

https://prometheus.io/docs/prometheus/latest/querying/functions/

If you want to implement the function on your own inside flink, you can make 
your own metrics, and add them to the metric group, and use the metrics inside 
rich functions. For example:

getRuntimeContext().getMetricGroup().addGroup(YOUR_METRIC_GROUP_NAME)
This will return you a org.apache.flink.metrics.MetricGroup. You can then use 
this MetricGroup to attach Metrics, such as counters, gauges (in the SO 
suggestion), and histograms. You can then interact with these Metrics, and they 
will expose your new metrics to your Prometheus exporter, for example.

Chris

From: aj 
Date: Tuesday, June 16, 2020 at 1:31 PM
To: user 
Subject: [EXTERNAL] Flink Count of Events using metric

Please help me with this:

https://stackoverflow.com/questions/62297467/flink-count-of-events-using-metric

I have a topic in Kafka where I am getting multiple types of events in JSON 
format. I have created a file stream sink to write these events to S3 with 
bucketing.

Now I want to publish an hourly count of each event as metrics to Prometheus 
and publish a grafana dashboard over that.

So please help me how can I achieve hourly count for each event using Flink 
metrics and publish to Prometheus.


--
Thanks & Regards,
Anuj Jain




Flink Count of Events using metric

2020-06-16 Thread aj
Please help me with this:

https://stackoverflow.com/questions/62297467/flink-count-of-events-using-metric

I have a topic in Kafka where I am getting multiple types of events in JSON
format. I have created a file stream sink to write these events to S3 with
bucketing.

Now I want to publish an hourly count of each event as metrics to
Prometheus and publish a grafana dashboard over that.

So please help me how can I achieve hourly count for each event using Flink
metrics and publish to Prometheus.


-- 
Thanks & Regards,
Anuj Jain






Re: Flink on yarn : yarn-session understanding

2020-06-16 Thread Vikash Dat
yarn will assign a random port when flink is deployed. To get the port you
need to do a yarn application -list and see the tracking url assigned to
your flink cluster. The port in that url will be the port you need to use
for the rest api.

On Tue, Jun 16, 2020 at 08:49 aj  wrote:

> Ok, thanks for the clarification on yarn session.
>
>  I am trying to connect to job manager on 8081 but it's not connecting.
>
> [image: image.png]
>
>
> So this is the address shown on my Flink job UI and i am trying to connect
> rest address on 8081 but its refusing connection.
>
> On Tue, Jun 9, 2020 at 1:03 PM Andrey Zagrebin 
> wrote:
>
>> Hi Anuj,
>>
>> Afaik, the REST API should work for both modes. What is the issue? Maybe,
>> some network problem to connect to YARN application master?
>>
>> Best,
>> Andrey
>>
>> On Mon, Jun 8, 2020 at 4:39 PM aj  wrote:
>>
>>> I am running some stream jobs that are long-running always. I am
>>> currently submitting each job as a standalone job on yarn.
>>>
>>> 1. I need to understand what is the advantage of using yarn-session and
>>> when should I use that.
>>> 2. Also, I am not able to access rest API services is it because I am
>>> running as standalone job over yarn. Is REST API works only in yarn-session?
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>>
>>>
>>> 
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


回复:通过Kafka更新规则

2020-06-16 Thread Sun.Zhu
为什么会重头消费规则呢?没有开启checkpoint吗?重启可以从checkpoint中的offset继续消费kafka中的规则吧




在2020年06月16日 11:57,Ruibin Xing 写道:
我们有一个Flink Job需要一些自定义的规则,希望能够动态添加、更新、删除。规则的数量在百到千条。目前设计的结构是RDB+ Kafka +
Flink。

RDB存储规则的完整快照,以展示给Web应用作增删改查。改动通过Kafka发送消息至Flink,通过BroadcastState传播规则。

目前有一个问题没有解决:如何使用Kafka来传递状态。我想了一下,大概有几种方案:

1. 消息标记Add、Upadte、Delete类型,在Flink中写逻辑来处理状态以和RDB中状态保持一致。
  目前的问题是,每次重启Job,都需要从头读Kafka,来回放状态的更新。Kafka中的状态消息也需要持久保存。担心长期会堆积很多消息。
2. 使用Kafka Compact Log来解决1的问题。这个方案主要是之前没有使用过Compact Log,不清楚会不会有坑。
3.使用方案1,但是启动时Flink从RDB拉取全量规则。
4. 规则更新后Kafka消息发送全量规则,启动时Flink只拉取最新一条消息。

  各位大佬是否有经验可以分享,怎么处理是比较科学的?不胜感激!


Re: Does Flink support reading files or CSV files from java.io.InputStream instead of file paths?

2020-06-16 Thread Marco Villalobos
Okay, it is not supported.  

I thought about this more and I disagree that this would break 
"distributability".

Currently, the API accepts a String which is a path, whether it be a path to a 
remote URL or a local file.
However, after the URL is parsed, ultimately what ends up happening is that an 
InputStream will serve as the abstraction that reads input from some source.

An InputStream can be remote, it can be a local file, it can be a connection to 
a server, or another client, and that situation, the system remains distributed.

Also, such an enhancement promotes "Interoperability" because now the user can 
decide the source of that data, rather forcing it to be a URL or physical file 
path.

I think this feature would make testing and demos more portable. I was writing 
a demo, and I wanted it to run without command-line arguments, which would have 
been very handy. I want the user to simply checkout the code and run it without 
having to supply a command line parameter declaring where the input file 
resides.

Thank you.

> On Jun 16, 2020, at 4:57 AM, Aljoscha Krettek  wrote:
> 
> Hi Marco,
> 
> this is not possible since Flink is designed mostly to read files from a 
> distributed filesystem, where paths are used to refer to those files. If you 
> read from files on the classpath you could just use plain old Java code and 
> won't need a distributed processing system such as Flink.
> 
> Best,
> Aljoscha
> 
> On 16.06.20 06:46, Marco Villalobos wrote:
>> Does Flink support reading files or CSV files from java.io.InputStream 
>> instead of file paths?
>> I'd rather just store my file on the class path and load it with 
>> java.lang.ClassLoader#getResourceAsStream(String).
>> If there is a way, I'd appreciate an example.
> 



回复:如何做Flink Stream的性能测试

2020-06-16 Thread Sun.Zhu
Hi
1.11 版本内置了DataGen、print、Blackhole的connector用来辅助功能测试,性能测试,线上观察,欢迎试用




在2020年06月16日 09:26,aven.wu 写道:
各位好;
  最近我想测试一下我的程序处理性能如何。请问有什么工具、或者应该通过什么方法来获得一个比较准确的测试结果。
   我的场景包含从kafka读取,flink 处理(有查询es做维表关联),处理结果输出到ES 和 Kafka。
Best
Aven


回复:sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 Thread Jark Wu
如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。

当前版本下的话,可以尝试 keyby+localcache+异步IO。

Best,
Jark

On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:

> 或者采用redis做维表存储介质。
>
> > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> >
> > hi,大家
> > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
>


Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread Jark Wu
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu  wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack  写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >>
> >>
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >>
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >>
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >>
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >>
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >> area_cnts()
> >>
> >
>
>
>


Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread Jark Wu
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu  wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack  写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >>
> >>
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >>
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >>
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >>
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >>
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >> area_cnts()
> >>
> >
>
>
>


Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-16 Thread Dmytro Dragan
Hi guys,

In our use case we consider to write data to AWS S3 in parquet format using 
Blink Batch mode.
As far as I see from one side to write parquet file valid approach is to use 
StreamingFileSink with Parquet bulk-encoded format, but
Based to documentation and tests it works only with OnCheckpointRollingPolicy.

While Blink Batch mode requires disabled checkpoint.

Has anyone faced with similar issue?



Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 Thread 李奇
频繁是什么级别的?可以加缓存。然后再定期更新。

> 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> 
> hi,大家
> 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?


对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 Thread wangxiangyan
hi,大家
维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?

Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
[image: image.png]

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends
BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek 
wrote:

> Did you look at the watermark metrics? Do you know what the current
> watermark is when the windows are firing. You could also get the current
> watemark when using a ProcessWindowFunction and also emit that in the
> records that you're printing, for debugging.
>
> What is that TimestampAssigner you're using for your timestamp
> assigner/watermark extractor?
>
> Best,
> Aljoscha
>
> On 16.06.20 14:10, Ori Popowski wrote:
> > Okay, so I created a simple stream (similar to the original stream),
> where
> > I just write the timestamps of each evaluated window to S3.
> > The session gap is 30 minutes, and this is one of the sessions:
> > (first-event, last-event, num-events)
> >
> > 11:23-11:23 11 events
> > 11:25-11:26 51 events
> > 11:28-11:29 74 events
> > 11:31-11:31 13 events
> >
> > Again, this is one session. How can we explain this? Why does Flink
> create
> > 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> > some help.
> >
> > On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:
> >
> >> Hi, thanks for answering.
> >>
> >>> I guess you consume from Kafka from the earliest offset, so you consume
> >> historical data and Flink is catching-up.
> >> Yes, it's what's happening. But Kafka is partitioned on sessionId, so
> skew
> >> between partitions cannot explain it.
> >> I think the only way it can happen is when when suddenly there's one
> event
> >> with very late timestamp
> >>
> >>> Just to verify, if you do keyBy sessionId, do you check the gaps of
> >> events from the same session?
> >> Good point. sessionId is unique in this case, and even if it's not -
> every
> >> single session suffers from this problem of early triggering so it's
> very
> >> unlikely that all millions sessions within that hour had duplicates.
> >>
> >> I'm suspecting that the fact I have two ProcessWindowFunctions one after
> >> the other somehow causes this.
> >> I deployed a version with one window function which just prints the
> >> timestamps to S3 (to find out if I have event-time jumps) and suddenly
> it
> >> doesn't trigger early (I'm running for 10 minutes and not a single event
> >> has arrived to the sink)
> >>
> >> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch 
> wrote:
> >>
> >>> Hi Ori,
> >>>
> >>> I guess you consume from Kafka from the earliest offset, so you consume
> >>> historical data and Flink is catching-up.
> >>>
> >>> Regarding: *My event-time timestamps also do not have big gaps*
> >>>
> >>> Just to verify, if you do keyBy sessionId, do you check the gaps of
> >>> events from the same session?
> >>>
> >>> Rafi
> >>>
> >>>
> >>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski 
> wrote:
> >>>
>  So why is it happening? I have no clue at the moment.
>  My event-time timestamps also do not have big gaps between them that
>  would explain the window triggering.
> 
> 
>  On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
>  wrote:
> 
> > If you are using event time in Flink, it is disconnected from the
> real
> > world wall clock time.
> > You can process historical data in a streaming program as if it was
> > real-time data (potentially reading through (event time) years of
> data in a
> > few (wall clock) minutes)
> >
> > On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com>
> wrote:
> >
> >> Hi
> >>
> >> I think it maybe you use the event time, and the timestamp between
> >> your event data is bigger than 30minutes, maybe you can check the
> source
> >> data timestamp.
> >>
> >> Best,
> >> Yichao Yang
> >>
> >> --
> >> 发自我的iPhone
> >>
> >>
> >> -- Original --
> >> *From:* Ori Popowski 
> >> *Date:* Mon,Jun 15,2020 10:50 PM
> >> *To:* user 
> >> *Subject:* Re: EventTimeSessionWindow firing too soon
> >>
> >>
> >
>
>


?????? Re: flink sql read hbase sink mysql data type not match

2020-06-16 Thread Zhou Zach
2020-06-16 21:01:09,756 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka version: unknown
2020-06-16 21:01:09,757 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
 - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): 
user_behavior-0
2020-06-16 21:01:09,765 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - Cluster 
ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig
 - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka version: unknown
2020-06-16 21:01:09,767 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher
 - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition 
user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO 
org.apache.flink.addons.hbase.HBaseLookupFunction
  - start close ...
2020-06-16 21:01:35,906 INFO 
org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient
  - Close zookeeper connection 0x72d39885 to 
cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO 
org.apache.flink.addons.hbase.HBaseLookupFunction
  - end close.
2020-06-16 21:01:35,908 INFO org.apache.zookeeper.ZooKeeper  
   
   - Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO org.apache.zookeeper.ClientCnxn  
   
  - EventThread shut down
2020-06-16 21:01:35,911 INFO 
org.apache.flink.runtime.taskmanager.Task 
 - Source: KafkaTableSource(uid, 
phoneType, clickCount, time) - 
SourceConversion(table=[default_catalog.default_database.user_behavior, source: 
[KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, 
clickCount, time]) - Calc(select=[uid, time]) - 
LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], 
joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, 
rowkey, cf]) - Calc(select=[CAST(time) AS time, cf.age AS age]) - 
SinkConversionToTuple2 - Sink: JDBCUpsertTableSink(time, age) (1/2) 
(e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity 
of the array: 2
at 
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
at 
org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
at 
org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
at 
org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
at LookupFunction$12.flatMap(Unknown Source)
at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at StreamExecCalc$7.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$6.processElement(Unknown Source)
  

Flink ML

2020-06-16 Thread Dimitris Vogiatzidakis
 Hello,
I'm a cs student currently working on my Bachelor's thesis. I've used Flink
to extract features out of some datasets, and I would like to use them
together with another dataset of (1,0) (Node exists or doesn't) to perform
a logistic regresssion. I have found that FLIP-39 has been accepted and it
is running in version 1.10.0 that I also currently use, but I'm having
trouble implementing it. Are there any java examples currently up and
running? Or if you can propose a different way to perform the task?
Thank you.

-Dimitris Vogiatzidakis


Re: flink sql 中怎么把ROW类型转换成INT

2020-06-16 Thread Leonard Xu
Hi
 Hbase connector中,除 rowkey 字段外,所有列簇 在FLINK中对应的类型都是 符合类型ROW(),这是因为ROW中可以包括多个 
field 能够和 hbase 中的一个列簇可以包含多个列很好地对应。贴个文档,你一看就懂:

CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 family2 ROW,
 family3 ROW,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (...);
 
-- scan data from the HBase table
SELECT rowkey, family1.q1, family3.q4, family3.q6 FROM hTable;
 

这个文档正在编写当中,很快就可以在官网上看到。

祝好,
Leonard Xu

> 在 2020年6月16日,19:16,Yichao Yang <1048262...@qq.com> 写道:
> 
> Hi
> 
> 
> row类型是不能强转int的,可以找一下阿里云flink sql的文档,其中有介绍哪些数据类型可以互转。
> 
> 
> Best,
> Yichao Yang
> 
> 
> 
> 发自我的iPhone
> 
> 
> -- 原始邮件 --
> 发件人: Zhou Zach  发送时间: 2020年6月16日 19:10
> 收件人: user-zh  主题: 回复:flink sql 中怎么把ROW类型转换成INT
> 
> 
> 
> flink sql从HBase中读取的类型为ROW,怎么把ROW类型转换成INT
> select cast(cf as Int) cf from hbase_table
> 直接这样转换不成功



Re: Flink on yarn : yarn-session understanding

2020-06-16 Thread aj
Ok, thanks for the clarification on yarn session.

 I am trying to connect to job manager on 8081 but it's not connecting.

[image: image.png]


So this is the address shown on my Flink job UI and i am trying to connect
rest address on 8081 but its refusing connection.

On Tue, Jun 9, 2020 at 1:03 PM Andrey Zagrebin  wrote:

> Hi Anuj,
>
> Afaik, the REST API should work for both modes. What is the issue? Maybe,
> some network problem to connect to YARN application master?
>
> Best,
> Andrey
>
> On Mon, Jun 8, 2020 at 4:39 PM aj  wrote:
>
>> I am running some stream jobs that are long-running always. I am
>> currently submitting each job as a standalone job on yarn.
>>
>> 1. I need to understand what is the advantage of using yarn-session and
>> when should I use that.
>> 2. Also, I am not able to access rest API services is it because I am
>> running as standalone job over yarn. Is REST API works only in yarn-session?
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>> 
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Did you look at the watermark metrics? Do you know what the current 
watermark is when the windows are firing. You could also get the current 
watemark when using a ProcessWindowFunction and also emit that in the 
records that you're printing, for debugging.


What is that TimestampAssigner you're using for your timestamp 
assigner/watermark extractor?


Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:

Okay, so I created a simple stream (similar to the original stream), where
I just write the timestamps of each evaluated window to S3.
The session gap is 30 minutes, and this is one of the sessions:
(first-event, last-event, num-events)

11:23-11:23 11 events
11:25-11:26 51 events
11:28-11:29 74 events
11:31-11:31 13 events

Again, this is one session. How can we explain this? Why does Flink create
4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
some help.

On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:


Hi, thanks for answering.


I guess you consume from Kafka from the earliest offset, so you consume

historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event
with very late timestamp


Just to verify, if you do keyBy sessionId, do you check the gaps of

events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every
single session suffers from this problem of early triggering so it's very
unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after
the other somehow causes this.
I deployed a version with one window function which just prints the
timestamps to S3 (to find out if I have event-time jumps) and suddenly it
doesn't trigger early (I'm running for 10 minutes and not a single event
has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:


Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume
historical data and Flink is catching-up.

Regarding: *My event-time timestamps also do not have big gaps*

Just to verify, if you do keyBy sessionId, do you check the gaps of
events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:


So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that
would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
wrote:


If you are using event time in Flink, it is disconnected from the real
world wall clock time.
You can process historical data in a streaming program as if it was
real-time data (potentially reading through (event time) years of data in a
few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:


Hi

I think it maybe you use the event time, and the timestamp between
your event data is bigger than 30minutes, maybe you can check the source
data timestamp.

Best,
Yichao Yang

--
发自我的iPhone


-- Original --
*From:* Ori Popowski 
*Date:* Mon,Jun 15,2020 10:50 PM
*To:* user 
*Subject:* Re: EventTimeSessionWindow firing too soon








Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Sorry, I now saw that this thread diverged. My mail client didn't pick 
it up because someone messed up the subject of the thread.


On 16.06.20 14:06, Aljoscha Krettek wrote:

Hi,

what is the timescale of your data in Kafka. If you have data in there 
that spans more than ~30 minutes I would expect your windows to fire 
very soon after the job is started. Event time does not depend on a wall 
clock but instead advances with the time in the stream. As Flink 
advances through the data in Kafka so does event-time advance in step.


Does that explain your situation?

Best,
Aljoscha

On 15.06.20 16:49, Ori Popowski wrote:

I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
   .addSource(new FlinkKafkaConsumer("…", 
compressedEventDeserializer,

properties))
   .filter(_.sessionId.nonEmpty)
   .flatMap(_ match { case (_, events) => events })
   .assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
 override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
   })
   .keyBy(_.sessionId)
   .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
   .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?







Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
Okay, so I created a simple stream (similar to the original stream), where
I just write the timestamps of each evaluated window to S3.
The session gap is 30 minutes, and this is one of the sessions:
(first-event, last-event, num-events)

11:23-11:23 11 events
11:25-11:26 51 events
11:28-11:29 74 events
11:31-11:31 13 events

Again, this is one session. How can we explain this? Why does Flink create
4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
some help.

On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:

> Hi, thanks for answering.
>
> > I guess you consume from Kafka from the earliest offset, so you consume
> historical data and Flink is catching-up.
> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
> between partitions cannot explain it.
> I think the only way it can happen is when when suddenly there's one event
> with very late timestamp
>
> > Just to verify, if you do keyBy sessionId, do you check the gaps of
> events from the same session?
> Good point. sessionId is unique in this case, and even if it's not - every
> single session suffers from this problem of early triggering so it's very
> unlikely that all millions sessions within that hour had duplicates.
>
> I'm suspecting that the fact I have two ProcessWindowFunctions one after
> the other somehow causes this.
> I deployed a version with one window function which just prints the
> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
> doesn't trigger early (I'm running for 10 minutes and not a single event
> has arrived to the sink)
>
> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:
>
>> Hi Ori,
>>
>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>>
>> Regarding: *My event-time timestamps also do not have big gaps*
>>
>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>>
>> Rafi
>>
>>
>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:
>>
>>> So why is it happening? I have no clue at the moment.
>>> My event-time timestamps also do not have big gaps between them that
>>> would explain the window triggering.
>>>
>>>
>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
>>> wrote:
>>>
 If you are using event time in Flink, it is disconnected from the real
 world wall clock time.
 You can process historical data in a streaming program as if it was
 real-time data (potentially reading through (event time) years of data in a
 few (wall clock) minutes)

 On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:

> Hi
>
> I think it maybe you use the event time, and the timestamp between
> your event data is bigger than 30minutes, maybe you can check the source
> data timestamp.
>
> Best,
> Yichao Yang
>
> --
> 发自我的iPhone
>
>
> -- Original --
> *From:* Ori Popowski 
> *Date:* Mon,Jun 15,2020 10:50 PM
> *To:* user 
> *Subject:* Re: EventTimeSessionWindow firing too soon
>
>


Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek

Hi,

what is the timescale of your data in Kafka. If you have data in there 
that spans more than ~30 minutes I would expect your windows to fire 
very soon after the job is started. Event time does not depend on a wall 
clock but instead advances with the time in the stream. As Flink 
advances through the data in Kafka so does event-time advance in step.


Does that explain your situation?

Best,
Aljoscha

On 15.06.20 16:49, Ori Popowski wrote:

I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
   .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer,
properties))
   .filter(_.sessionId.nonEmpty)
   .flatMap(_ match { case (_, events) => events })
   .assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
 override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
   })
   .keyBy(_.sessionId)
   .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
   .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?





Re: Does Flink support reading files or CSV files from java.io.InputStream instead of file paths?

2020-06-16 Thread Aljoscha Krettek

Hi Marco,

this is not possible since Flink is designed mostly to read files from a 
distributed filesystem, where paths are used to refer to those files. If 
you read from files on the classpath you could just use plain old Java 
code and won't need a distributed processing system such as Flink.


Best,
Aljoscha

On 16.06.20 06:46, Marco Villalobos wrote:


Does Flink support reading files or CSV files from java.io.InputStream instead 
of file paths?

I'd rather just store my file on the class path and load it with 
java.lang.ClassLoader#getResourceAsStream(String).

If there is a way, I'd appreciate an example.





Re: Re: flink sql read hbase sink mysql data type not match

2020-06-16 Thread Benchao Li
不能直接cast,ROW类型是一个复合类型,要获取其中的某个字段,可以用`.`来获取。
比如你现在这个场景,就是 SELECT rowkey, cf.age FROM users

Zhou Zach  于2020年6月16日周二 下午6:59写道:

> flink sql 怎么将ROW<`age` INT转换成INT啊
>
>
> streamTableEnv.sqlUpdate(
>   """
> |
> |insert into  user_age
> |SELECT rowkey, cast(cf as int) as age
> |FROM
> |  users
> |
> |""".stripMargin)这样尝试报错了


Re: Improved performance when using incremental checkpoints

2020-06-16 Thread Aljoscha Krettek

Hi,

it might be that the operations that Flink performs on RocksDB during 
checkpointing will "poke" RocksDB somehow and make it clean up it's 
internal hierarchies of storage more. Other than that, I'm also a bit 
surprised by this.


Maybe Yun Tang will come up with another idea.

Best,
Aljoscha

On 16.06.20 12:42, nick toker wrote:

Hi,

We used both flink versions 1.9.1 and 1.10.1
We used rocksDB default configuration.
The streaming pipeline is very simple.

1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
 public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

 if (testMapState.isEmpty()) {

 testMapState.putAll(value);

 out.collect(value);

 testMapState.clear();
 }
 }

We used the same code with ValueState and observed the same results.


BR,

Nick


‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬


Hi Nick

It's really strange that performance could improve when checkpoint is
enabled.
In general, enable checkpoint might bring a bit performance downside to
the whole job.

Could you give more details e.g. Flink version, configurations of RocksDB
and simple code which could reproduce this problem.

Best
Yun Tang
--
*From:* nick toker 
*Sent:* Tuesday, June 16, 2020 15:44
*To:* user@flink.apache.org 
*Subject:* Improved performance when using incremental checkpoints

Hello,

We are using RocksDB as the backend state.
At first we didn't enable the checkpoints mechanism.

We observed the following behaviour and we are wondering why ?

When using the rocksDB *without* checkpoint the performance was very
extremely bad.
And when we enabled the checkpoint the performance was improved by a*
factor of 10*.

Could you please explain if this behaviour is expected ?
Could you please explain why enabling the checkpoint significantly
improves the performance ?

BR,
Nick







回复:异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 Thread Yichao Yang
Hi


使用到缓存的话大多数情况下都要用到keyby,并且在keyby之后的算子使用缓存,保证相同key只会访问一个缓存,否则缓存命中率一般情况下都会很低。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Benchao Li 

Re: MapState bad performance

2020-06-16 Thread Yun Tang
Hi Nick

From my experience, it's not easy to tune this without code to reproduce. Could 
you please give code with fake source to reproduce so that we could help you?

If CPU usage is 100% at rocksDB related methods, it's might be due to we access 
RocksDB too often . If the CPU usage is not 100% while disk util is 100%, it 
should be
we meet the performance limit of disk.

BTW, if you have 16GB memory TM with 32 slots, it would only give about 150MB 
managed memory [1][2] for RocksDB, which looks like a bit small.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html#managed-memory
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_tuning.html#rocksdb-state-backend

Best
Yun Tang



From: nick toker 
Sent: Tuesday, June 16, 2020 18:36
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: MapState bad performance

Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write 300MB 
and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ 
<‪myas...@live.com‬‏>:‬
Hi Nick

As you might know, RocksDB suffers not so good performance for iterator-like 
operations due to it needs to merge sort for multi levels. [1]

Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek 
operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator over 
state and remove entry [3].
However, even these operations behaves not so good, I don't think they would 
behave extremely bad in general case. From our experience on SSD, the latency 
of seek should be less than 100us
and could go up to hundreds of us, did you use SSD disk?

  1.  What is the Flink version, taskmanager memory, number of slots and 
RocksDB related configurations?
  2.  Have you checked the IOPS, disk util for those machines which containing 
task manager running RocksDB?

[1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
[2] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
[3] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254

Best
Yun Tang


From: nick toker mailto:nick.toker@gmail.com>>
Sent: Tuesday, June 16, 2020 15:35
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: MapState bad performance

Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:


private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx, 
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something wrong ?
Is this behaviour expected because flink  recommendations are to use MapState 
and NOT ValueState ?

BR,
Nick


回复:Re:关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 Thread Yichao Yang
Hi


个人理解一般情况下都是业务需要才会做keyby操作,比如想统计一个用户一分钟pv按照userid 
keyby。如果你的任务没有这样的业务需求完全不用考虑使用这些算子的。


Best,
Yichao Yang






-- 原始邮件 --
发件人: Michael Ran 

Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
 Hi, thanks for answering.

> I guess you consume from Kafka from the earliest offset, so you consume
historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event
with very late timestamp

> Just to verify, if you do keyBy sessionId, do you check the gaps of
events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every
single session suffers from this problem of early triggering so it's very
unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after
the other somehow causes this.
I deployed a version with one window function which just prints the
timestamps to S3 (to find out if I have event-time jumps) and suddenly it
doesn't trigger early (I'm running for 10 minutes and not a single event
has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:

> Hi Ori,
>
> I guess you consume from Kafka from the earliest offset, so you consume
> historical data and Flink is catching-up.
>
> Regarding: *My event-time timestamps also do not have big gaps*
>
> Just to verify, if you do keyBy sessionId, do you check the gaps of
> events from the same session?
>
> Rafi
>
>
> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:
>
>> So why is it happening? I have no clue at the moment.
>> My event-time timestamps also do not have big gaps between them that
>> would explain the window triggering.
>>
>>
>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
>> wrote:
>>
>>> If you are using event time in Flink, it is disconnected from the real
>>> world wall clock time.
>>> You can process historical data in a streaming program as if it was
>>> real-time data (potentially reading through (event time) years of data in a
>>> few (wall clock) minutes)
>>>
>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:
>>>
 Hi

 I think it maybe you use the event time, and the timestamp between your
 event data is bigger than 30minutes, maybe you can check the source data
 timestamp.

 Best,
 Yichao Yang

 --
 发自我的iPhone


 -- Original --
 *From:* Ori Popowski 
 *Date:* Mon,Jun 15,2020 10:50 PM
 *To:* user 
 *Subject:* Re: EventTimeSessionWindow firing too soon




回复:flink sql 中怎么把ROW类型转换成INT

2020-06-16 Thread Yichao Yang
Hi


row类型是不能强转int的,可以找一下阿里云flink sql的文档,其中有介绍哪些数据类型可以互转。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Zhou Zach 

flink sql ????????ROW??????????INT

2020-06-16 Thread Zhou Zach
flink sql??HBase??ROWROW??INT
select cast(cf as Int) cf from hbase_table
??

Re:关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 Thread Michael Ran
默认会有中hash 吧,看做啥操作
在 2020-06-16 18:28:51,"hdxg1101300...@163.com"  写道:
>
>您好:
>如果我的数据本身比较分散,重复的ID很少,还有必要进行keyby操作吗
>谢谢!
>
>
>hdxg1101300...@163.com


Re:Re: flink sql read hbase sink mysql data type not match

2020-06-16 Thread Zhou Zach
flink sql ??ROW<`age` INT??INT??


streamTableEnv.sqlUpdate(
  """
|
|insert into  user_age
|SELECT rowkey, cast(cf as int) as age
|FROM
|  users
|
|""".stripMargin)??

Re: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 Thread Benchao Li
是否有观察过缓存命中率呢,如果缓存命中率不高,性能也不会太好的。

hdxg1101300...@163.com  于2020年6月16日周二 下午6:29写道:

> 目前使用guava的cache做了缓存但是效果不是很好
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Benchao Li
> 发送时间: 2020-06-16 17:40
> 收件人: user-zh
> 主题: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题
> 感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
> 要不然就是优化提升外部系统的吞吐。
>
> hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:
>
> > 您好:
> > 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> > flink1.10.0
> >
> >
> >
> > hdxg1101300...@163.com
> >
>


Re: Improved performance when using incremental checkpoints

2020-06-16 Thread nick toker
Hi,

We used both flink versions 1.9.1 and 1.10.1
We used rocksDB default configuration.
The streaming pipeline is very simple.

1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We used the same code with ValueState and observed the same results.


BR,

Nick


‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> It's really strange that performance could improve when checkpoint is
> enabled.
> In general, enable checkpoint might bring a bit performance downside to
> the whole job.
>
> Could you give more details e.g. Flink version, configurations of RocksDB
> and simple code which could reproduce this problem.
>
> Best
> Yun Tang
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:44
> *To:* user@flink.apache.org 
> *Subject:* Improved performance when using incremental checkpoints
>
> Hello,
>
> We are using RocksDB as the backend state.
> At first we didn't enable the checkpoints mechanism.
>
> We observed the following behaviour and we are wondering why ?
>
> When using the rocksDB *without* checkpoint the performance was very
> extremely bad.
> And when we enabled the checkpoint the performance was improved by a*
> factor of 10*.
>
> Could you please explain if this behaviour is expected ?
> Could you please explain why enabling the checkpoint significantly
> improves the performance ?
>
> BR,
> Nick
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Benchao Li
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: MapState bad performance

2020-06-16 Thread nick toker
Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write
300MB and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>1. What is the Flink version, taskmanager memory, number of slots and
>RocksDB related configurations?
>2. Have you checked the IOPS, disk util for those machines which
>containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org 
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
> public void processElement(Map value, Context ctx, 
> Collector> out) throws Exception {
>
> if (testMapState.isEmpty()) {
>
> testMapState.putAll(value);
>
> out.collect(value);
>
> testMapState.clear();
> }
> }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>


Re: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 Thread hdxg1101300...@163.com
目前使用guava的cache做了缓存但是效果不是很好



hdxg1101300...@163.com
 
发件人: Benchao Li
发送时间: 2020-06-16 17:40
收件人: user-zh
主题: Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题
感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
要不然就是优化提升外部系统的吞吐。
 
hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:
 
> 您好:
> 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> flink1.10.0
>
>
>
> hdxg1101300...@163.com
>


关于keyby算子的疑问,如果本身数据比较分散,还有keyby的必要吗

2020-06-16 Thread hdxg1101300...@163.com

您好:
如果我的数据本身比较分散,重复的ID很少,还有必要进行keyby操作吗
谢谢!


hdxg1101300...@163.com


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-16 Thread Sun.Zhu
我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 


Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制




Re: flink sql read hbase sink mysql data type not match

2020-06-16 Thread Benchao Li
Hi,

上面的错误提示已经比较明确了,说的是你的query的schema跟sink table的schema对不上。
query的schema是:[rowkey: STRING, cf: ROW<`age` INT>]
而sink的schema是:[rowkey: STRING, age: INT]

你可以调整一下你的sink的schema;或者调整一下你的query语句。

Zhou Zach  于2020年6月16日周二 下午5:51写道:

>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Field types of query result and registered
> TableSink default_catalog.default_database.user_age do not match.
> Query schema: [rowkey: STRING, cf: ROW<`age` INT>]
> Sink schema: [rowkey: STRING, age: INT]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.user_age do not match.
>
>
>
>
>
>
>
>
> query:
>
>
>
>
> val users = new HBaseTableSource(hConf, "user_hbase5")
> users.setRowKey("rowkey", classOf[String]) // currency as the primary key
> users.addColumn("cf", "age", classOf[Integer])
>
> streamTableEnv.registerTableSource("users", users)
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_age (
> |`rowkey` VARCHAR,
> |age INT
> |) WITH (
> |'connector.type' = 'jdbc',
> |'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
>
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into  user_age
> |SELECT *
> |FROM
> |  users
> |
> |""".stripMargin)


flink sql read hbase sink mysql data type not match

2020-06-16 Thread Zhou Zach


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Field types of query result and registered TableSink 
default_catalog.default_database.user_age do not match.
Query schema: [rowkey: STRING, cf: ROW<`age` INT>]
Sink schema: [rowkey: STRING, age: INT]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.ValidationException: Field types of query 
result and registered TableSink default_catalog.default_database.user_age do 
not match.








query:




val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])

streamTableEnv.registerTableSource("users", users)


streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_age (
|`rowkey` VARCHAR,
|age INT
|) WITH (
|'connector.type' = 'jdbc',
|'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)

streamTableEnv.sqlUpdate(
"""
|
|insert into  user_age
|SELECT *
|FROM
|  users
|
|""".stripMargin)

Re: 异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 Thread Benchao Li
感觉像这种IO Bound的场景,一种是可以尝试攒个小batch去请求;一种是加一个cache来降低请求的数量。
要不然就是优化提升外部系统的吞吐。

hdxg1101300...@163.com  于2020年6月16日周二 下午5:35写道:

> 您好:
> 采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
> flink1.10.0
>
>
>
> hdxg1101300...@163.com
>


异步io请求hbase,hbase获取数据环节背压高成为瓶颈问题

2020-06-16 Thread hdxg1101300...@163.com
您好:
采用异步io的方式从hbase获取信息,发现hbase上游背压很高。有没有什么建议或者好的方式!谢谢!
flink1.10.0



hdxg1101300...@163.com


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread zilong xiao
看了下issue,跟我描述的问题很相似,我尝试切到1.10.1试试看,谢谢您的解惑

Kurt Young  于2020年6月16日周二 下午5:15写道:

> 应该是这个: https://issues.apache.org/jira/browse/FLINK-16068
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:
>
> > 我看了下1.10.1的release note,您说的应该就是这个issue:
> > https://issues.apache.org/jira/browse/FLINK-16345
> > ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> > DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
> >
> > Benchao Li  于2020年6月16日周二 下午5:00写道:
> >
> > > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> > >
> > > zilong xiao  于2020年6月16日周二 下午4:56写道:
> > >
> > >> 如题,在SQL
> > >>
> >
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> > >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> > >> 代码如下图:
> > >> [image: image.png]
> > >> 异常堆栈:
> > >>
> > >
> >
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread Kurt Young
应该是这个: https://issues.apache.org/jira/browse/FLINK-16068

Best,
Kurt


On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:

> 我看了下1.10.1的release note,您说的应该就是这个issue:
> https://issues.apache.org/jira/browse/FLINK-16345
> ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
>
> Benchao Li  于2020年6月16日周二 下午5:00写道:
>
> > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> >
> > zilong xiao  于2020年6月16日周二 下午4:56写道:
> >
> >> 如题,在SQL
> >>
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> >> 代码如下图:
> >> [image: image.png]
> >> 异常堆栈:
> >>
> >
>


Re: MapState bad performance

2020-06-16 Thread Yun Tang
Hi Nick

As you might know, RocksDB suffers not so good performance for iterator-like 
operations due to it needs to merge sort for multi levels. [1]

Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek 
operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator over 
state and remove entry [3].
However, even these operations behaves not so good, I don't think they would 
behave extremely bad in general case. From our experience on SSD, the latency 
of seek should be less than 100us
and could go up to hundreds of us, did you use SSD disk?

  1.  What is the Flink version, taskmanager memory, number of slots and 
RocksDB related configurations?
  2.  Have you checked the IOPS, disk util for those machines which containing 
task manager running RocksDB?

[1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
[2] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
[3] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254

Best
Yun Tang


From: nick toker 
Sent: Tuesday, June 16, 2020 15:35
To: user@flink.apache.org 
Subject: MapState bad performance

Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:


private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx, 
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something wrong ?
Is this behaviour expected because flink  recommendations are to use MapState 
and NOT ValueState ?

BR,
Nick


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread zilong xiao
我看了下1.10.1的release note,您说的应该就是这个issue:
https://issues.apache.org/jira/browse/FLINK-16345
,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread zilong xiao
是用的1.10.0版本,我尝试切到1.10.1试试看,请问这个有对应的issue吗?想深入了解下这个问题

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Rafi Aroch
Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume
historical data and Flink is catching-up.

Regarding: *My event-time timestamps also do not have big gaps*

Just to verify, if you do keyBy sessionId, do you check the gaps of
events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:

> So why is it happening? I have no clue at the moment.
> My event-time timestamps also do not have big gaps between them that would
> explain the window triggering.
>
>
> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
> wrote:
>
>> If you are using event time in Flink, it is disconnected from the real
>> world wall clock time.
>> You can process historical data in a streaming program as if it was
>> real-time data (potentially reading through (event time) years of data in a
>> few (wall clock) minutes)
>>
>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:
>>
>>> Hi
>>>
>>> I think it maybe you use the event time, and the timestamp between your
>>> event data is bigger than 30minutes, maybe you can check the source data
>>> timestamp.
>>>
>>> Best,
>>> Yichao Yang
>>>
>>> --
>>> 发自我的iPhone
>>>
>>>
>>> -- Original --
>>> *From:* Ori Popowski 
>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>> *To:* user 
>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>
>>>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread Benchao Li
你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。

zilong xiao  于2020年6月16日周二 下午4:56写道:

> 如题,在SQL
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> 代码如下图:
> [image: image.png]
> 异常堆栈:
>


  1   2   >