Re: flink sql 写 hive分区表失败

2020-05-27 Thread Leonard Xu
Hi,
>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 
> 5

应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
 
祝好,
Leonard Xu

> 在 2020年5月28日,12:57,Zhou Zach  写道:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Field types of query result and registered TableSink 
> dwdCatalog.dwd.t1_copy do not match.
> 
> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
> INT NOT NULL, EXPR$5: INT NOT NULL]
> 
> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> 
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> 
> at java.security.AccessController.doPrivileged(Native Method)
> 
> at javax.security.auth.Subject.doAs(Subject.java:422)
> 
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> 
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> 
> 
> 
> 
> hive分区表:
> CREATE TABLE `dwd.t1`(
>  `id` bigint, 
>  `name` string)
> PARTITIONED BY ( 
>  `p_year` int, 
>  `p_month` int)
> 
> 
> CREATE TABLE `dwd.t1_copy`(
>  `id` bigint, 
>  `name` string)
> PARTITIONED BY ( 
>  `p_year` int, 
>  `p_month` int)
> 
> 
> Flink sql:
> tableEnv.sqlUpdate(
>  """
>|
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
> `p_month` = 5)
>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 
> 5
>|
>|""".stripMargin)
> 
> 
> thanks for your help



flink sql 写 hive分区表失败

2020-05-27 Thread Zhou Zach
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Field types of query result and registered TableSink 
dwdCatalog.dwd.t1_copy do not match.

Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: INT 
NOT NULL, EXPR$5: INT NOT NULL]

Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)




hive分区表:
CREATE TABLE `dwd.t1`(
  `id` bigint, 
  `name` string)
PARTITIONED BY ( 
  `p_year` int, 
  `p_month` int)
  
  
CREATE TABLE `dwd.t1_copy`(
  `id` bigint, 
  `name` string)
PARTITIONED BY ( 
  `p_year` int, 
  `p_month` int)


Flink sql:
tableEnv.sqlUpdate(
  """
|
|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
`p_month` = 5)
|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 5
|
|""".stripMargin)


thanks for your help

Re: Apache Flink - Question about application restart

2020-05-27 Thread Zhu Zhu
Hi M,

Sorry I missed your message.
JobID will not change for a generated JobGraph. However, a new JobGraph
will be generated each time a job is submitted.
So that multiple submissions will have multiple JobGraphs. This is because
different submissions are considered as different jobs, as Till mentioned.
One example is that you can submit an application to a cluster multiple
times at the same time, different JobIDs are needed to differentiate them.

Thanks,
Zhu Zhu

Till Rohrmann  于2020年5月27日周三 下午10:05写道:

> Hi,
>
> if you submit the same job multiple times, then it will get every time a
> different JobID assigned. For Flink, different job submissions are
> considered to be different jobs. Once a job has been submitted, it will
> keep the same JobID which is important in order to retrieve the checkpoints
> associated with this job.
>
> Cheers,
> Till
>
> On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:
>
>> Hi Zhu Zhu:
>>
>> I have another clafication - it looks like if I run the same app multiple
>> times - it's job id changes.  So it looks like even though the graph is the
>> same the job id is not dependent on the job graph only since with different
>> runs of the same app it is not the same.
>>
>> Please let me know if I've missed anything.
>>
>> Thanks
>>
>> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
>> wrote:
>>
>>
>> Hi Zhu Zhu:
>>
>> Just to clarify - from what I understand, EMR also has by default restart
>> times (I think it is 3). So if the EMR restarts the job - the job id is the
>> same since the job graph is the same.
>>
>> Thanks for the clarification.
>>
>> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> Just share some additional information.
>>
>> When deploying Flink application on Yarn and it exhausted restart policy,
>> then
>> the whole application will failed. If you start another instance(Yarn
>> application),
>> even the high availability is configured, we could not recover from the
>> latest
>> checkpoint because the clusterId(i.e. applicationId) has changed.
>>
>>
>> Best,
>> Yang
>>
>> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>>
>> Hi M,
>>
>> Regarding your questions:
>> 1. yes. The id is fixed once the job graph is generated.
>> 2. yes
>>
>> Regarding yarn mode:
>> 1. the job id keeps the same because the job graph will be generated once
>> at client side and persist in DFS for reuse
>> 2. yes if high availability is enabled
>>
>> Thanks,
>> Zhu Zhu
>>
>> M Singh  于2020年5月23日周六 上午4:06写道:
>>
>> Hi Flink Folks:
>>
>> If I have a Flink Application with 10 restarts, if it fails and restarts,
>> then:
>>
>> 1. Does the job have the same id ?
>> 2. Does the automatically restarting application, pickup from the last
>> checkpoint ? I am assuming it does but just want to confirm.
>>
>> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
>> restart the job 3 times (after it has exhausted it's restart policy) .  If
>> that is the case:
>> 1. Does the job get a new id ? I believe it does, but just want to
>> confirm.
>> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
>> not, but is there a way to make it restart from the last checkpoint of the
>> failed job (after it has exhausted its restart policy) ?
>>
>> Thanks
>>
>>
>>


回复: flink 如何自定义connector

2020-05-27 Thread 111
Hi,
在sql-gateway里面,类加载默认是子类优先,每次提交任务都是独立的tableEnvironment,独立的classloader,因此不存在这个问题。
Best,
Xinghalo

回复: flink 如何自定义connector

2020-05-27 Thread Lijie Wang
可能的确会存在这个问题。在添加依赖时注意不要将 flink provided 的包打包进去就可以。也可以通过设置 parent-first 来解决这个问题。




在2020年05月28日 11:03,forideal 写道:
Hi 111,

关于第二点:
`2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下`
这么做是不是存在如下 `X can't be cast to X ` 隐患
因为把 Connector 放在 lib 中,会有 classloader 的问题,直接的现象就是 X can't be cast to X 
问题[1]。当然这只是说可能会发生。比如,我们把 usercode 代码放入 Flink lib 我们会发现,当我们使用 jar 
包上传的方式运行任务时,jar 中也包含 lib 中的代码会触发这样的问题。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions


Best
forideal








在 2020-05-28 10:16:45,"111"  写道:
Hi,
想要在sqlgateway里面使用,那么可以看看下面几个条件:
1 满足SPI的要求,能让flink自动发现实现类
2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
3 如果与Hive集成,使用hivecatalog,那么先要注册表
这样就可以使用了。
Best,
Xinghalo


Re:回复: flink 如何自定义connector

2020-05-27 Thread forideal
Hi 111,
  
 关于第二点:
   `2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下`
 这么做是不是存在如下 `X can't be cast to X ` 隐患
因为把 Connector 放在 lib 中,会有 classloader 的问题,直接的现象就是 X can't be cast to X 
问题[1]。当然这只是说可能会发生。比如,我们把 usercode 代码放入 Flink lib 我们会发现,当我们使用 jar 
包上传的方式运行任务时,jar 中也包含 lib 中的代码会触发这样的问题。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions


Best
 forideal








在 2020-05-28 10:16:45,"111"  写道:
>Hi,
>想要在sqlgateway里面使用,那么可以看看下面几个条件:
>1 满足SPI的要求,能让flink自动发现实现类
>2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
>3 如果与Hive集成,使用hivecatalog,那么先要注册表
>这样就可以使用了。
>Best,
>Xinghalo


Re: 向flink push代码

2020-05-27 Thread tison
Flink 的特点就是快(x)

Best,
tison.


宇张  于2020年5月28日周四 上午10:56写道:

> 感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
> 这个,好快的响应速度
>
> On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:
>
> > Hi,
> > Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
> >
> > Best,
> > Leonard Xu
> > [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> > https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
> >
> > > 在 2020年5月28日,10:18,Yangze Guo  写道:
> > >
> > > 您好,社区的贡献代码教程[1]。
> > >
> > > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> > >
> > > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> > >>
> > >> 找打了教程了
> > >>
> > >>
> > >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> > >>
> > >>> hi,
> > >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> > >>>
> >
> >
>


Re: 向flink push代码

2020-05-27 Thread 宇张
感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
这个,好快的响应速度

On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:

> Hi,
> Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
>
> Best,
> Leonard Xu
> [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
>
> > 在 2020年5月28日,10:18,Yangze Guo  写道:
> >
> > 您好,社区的贡献代码教程[1]。
> >
> > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> >
> > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> >>
> >> 找打了教程了
> >>
> >>
> >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> >>
> >>> hi,
> >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> >>>
>
>


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 Thread macia kk
Get it, thanks

Leonard Xu  于2020年5月28日周四 上午10:34写道:

>
> > 我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
>
>
> Table API 确实是要灵活些,只是最近两个版本中,SQL模块社区的主要精力在搞DDL,DDL可以降低用户门槛,提升易用性,Table
> API的迭代稍微慢了些,
> 我理解1.12中应该会完善Descriptor API,这也是比较重要的用户入口,目前建议优先使用DDL。
>
> Best,
> Leonard Xu
>
> > 在 2020年5月28日,10:23,macia kk  写道:
> >
> > 好的,谢谢,
> >
> > 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
> >
> > Leonard Xu  于2020年5月28日周四 上午10:17写道:
> >
> >> Hi,
> >> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
> >>
> >> Best
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >>>
> >> [2]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >>>
> >>
> >>
> >>
> >>> 在 2020年5月28日,00:45,macia kk  写道:
> >>>
> >>> Hi 各位大佬
> >>>
> >>>  .field("event_time", TIMESTAMP()).rowtime(
> >>> new Rowtime()
> >>> .timestampsFromField("maxwell_ts")
> >>> .watermarksPeriodicBounded(6)
> >>>   )
> >>>
> >>>
> >>> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >>>
> >>> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> >>> physical type
> >>>
> >>>
> >>> 有类似
> >>>
> >>> event_time as to_timestamp(maxwell_ts)
> >>>
> >>>
> >>> 这么的操作码?
> >>
> >>
>
>


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 Thread Leonard Xu


> 我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制


Table API 确实是要灵活些,只是最近两个版本中,SQL模块社区的主要精力在搞DDL,DDL可以降低用户门槛,提升易用性,Table 
API的迭代稍微慢了些,
我理解1.12中应该会完善Descriptor API,这也是比较重要的用户入口,目前建议优先使用DDL。

Best,
Leonard Xu

> 在 2020年5月28日,10:23,macia kk  写道:
> 
> 好的,谢谢,
> 
> 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
> 
> Leonard Xu  于2020年5月28日周四 上午10:17写道:
> 
>> Hi,
>> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
>> 
>> Best
>> Leonard Xu
>> 
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
>> <
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
>>> 
>> [2]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
>> <
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
>>> 
>> 
>> 
>> 
>>> 在 2020年5月28日,00:45,macia kk  写道:
>>> 
>>> Hi 各位大佬
>>> 
>>>  .field("event_time", TIMESTAMP()).rowtime(
>>> new Rowtime()
>>> .timestampsFromField("maxwell_ts")
>>> .watermarksPeriodicBounded(6)
>>>   )
>>> 
>>> 
>>> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
>>> 
>>> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
>>> physical type
>>> 
>>> 
>>> 有类似
>>> 
>>> event_time as to_timestamp(maxwell_ts)
>>> 
>>> 
>>> 这么的操作码?
>> 
>> 



Re: 向flink push代码

2020-05-27 Thread Leonard Xu
Hi,
Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。

Best,
Leonard Xu
[1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ 


> 在 2020年5月28日,10:18,Yangze Guo  写道:
> 
> 您好,社区的贡献代码教程[1]。
> 
> Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> 
> [1] https://flink.apache.org/zh/contributing/contribute-code.html
> 
> Best,
> Yangze Guo
> 
> On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
>> 
>> 找打了教程了
>> 
>> 
>> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
>> 
>>> hi,
>>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
>>> 



Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 Thread macia kk
好的,谢谢,

放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制

Leonard Xu  于2020年5月28日周四 上午10:17写道:

> Hi,
> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
>
> Best
> Leonard Xu
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >
>
>
>
> > 在 2020年5月28日,00:45,macia kk  写道:
> >
> > Hi 各位大佬
> >
> >   .field("event_time", TIMESTAMP()).rowtime(
> >  new Rowtime()
> >  .timestampsFromField("maxwell_ts")
> >  .watermarksPeriodicBounded(6)
> >)
> >
> >
> > 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >
> > Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> > physical type
> >
> >
> > 有类似
> >
> > event_time as to_timestamp(maxwell_ts)
> >
> >
> > 这么的操作码?
>
>


Re: 向flink push代码

2020-05-27 Thread Yangze Guo
您好,社区的贡献代码教程[1]。

Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。

[1] https://flink.apache.org/zh/contributing/contribute-code.html

Best,
Yangze Guo

On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
>
> 找打了教程了
>
>
> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
>
> > hi,
> > 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> >


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 Thread Leonard Xu
Hi,
我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。

Best
Leonard Xu

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
 

[2] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
 




> 在 2020年5月28日,00:45,macia kk  写道:
> 
> Hi 各位大佬
> 
>   .field("event_time", TIMESTAMP()).rowtime(
>  new Rowtime()
>  .timestampsFromField("maxwell_ts")
>  .watermarksPeriodicBounded(6)
>)
> 
> 
> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> 
> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> physical type
> 
> 
> 有类似
> 
> event_time as to_timestamp(maxwell_ts)
> 
> 
> 这么的操作码?



回复: flink 如何自定义connector

2020-05-27 Thread 111
Hi,
想要在sqlgateway里面使用,那么可以看看下面几个条件:
1 满足SPI的要求,能让flink自动发现实现类
2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
3 如果与Hive集成,使用hivecatalog,那么先要注册表
这样就可以使用了。
Best,
Xinghalo

Re: Flink Dashboard UI Tasks hard limit

2020-05-27 Thread Xintong Song
Ah, I guess I had misunderstood what your mean.

Below 18000 tasks, the Flink Job is able to start up.
> Even though I increased the number of slots, it still works when 312 slots
> are being used.
>
When you say "it still works", I thought that you increased the parallelism
the job was sill executed as the parallelism was not increased.
>From your latest reply, it seems the job's parallelism is indeed increased,
but then it runs into failures.

The reason you run into the "Insufficient number of network buffers"
exception, is that with more tasks in your job, more inter-task data
transmission channels, thus memory for network buffers, are needed.

To increase the network memory size, the following configuration options,
as you already found, are related.

   - taskmanager.network.memory.fraction
   - taskmanager.network.memory.max
   - taskmanager.network.memory.min

Please be aware that `taskmanager.memory.task.off-heap.size` is not related
to network memory, and is only available in Flink 1.10 and above
while you're using 1.9.1 as suggested by the screenshots.

The network memory size is calculated as `min(max(some_total_value *
network_fraction, network_min), network_max)`. According to the error
message, your current network memory size is `85922 buffers * 32KB/buffer =
2685MB`, smaller than your "max" (4gb). That means increasing the "max"
does not help in your case. It is the "fraction" that you need to increase.

Thank you~

Xintong Song



On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan 
wrote:

> Hi Xintong,
> Looks like the issue is not fully resolved :( Attaching 2 screenshots of
> the memory consumption of 1 of the TaskManagers.
>
> To increase the used up Direct memory off heap,Do I change this:
>  taskmanager.memory.task.off-heap.size: 5gb
>
> I had increased the taskmanager.network.memory.max: 24gb
> which seems excessive.
>
> 1 of the errors I saw in the Flink logs:
>
> java.io.IOException: Insufficient number of network buffers: required 1,
> but only 0 available. The total number of network buffers is currently set
> to 85922 of 32768 bytes each. You can increase this number by setting the
> configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>
> TIA,
>
>
> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan 
> wrote:
>
>> Thanks so much, Xintong for guiding me through this. I looked at the
>> Flink logs to see the errors.
>> I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout:
>> 240s to increase the number of tasks.
>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>
>> taskmanager.network.memory.fraction: 0.15
>> taskmanager.network.memory.max: 4gb
>> taskmanager.network.memory.min: 500mb
>> akka.ask.timeout: 240s
>>
>> On Tue, May 26, 2020 at 8:42 PM Xintong Song 
>> wrote:
>>
>>> Could you also explain how do you set the parallelism when getting this
>>> execution plan?
>>> I'm asking because this json file itself only shows the resulted
>>> execution plan. It is not clear to me what is not working as expected in
>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>> execution plan only shows 5.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan 
>>> wrote:
>>>
 Hi Xintong,
 Thanks for the excellent clarification for tasks.

 I attached a sample screenshot above and din't reflect the slots used
 and the tasks limit I was running into in that pic.

 I am attaching my Execution plan here. Please let me know how I can
 increase the nmber of tasks aka parallelism. As  increase the parallelism,
 i run into this bottleneck with the tasks.

 BTW - The https://flink.apache.org/visualizer/ is a great start to see
 this.
 TIA,

 On Sun, May 24, 2020 at 7:52 PM Xintong Song 
 wrote:

> Increasing network memory buffers (fraction, min, max) seems to
>> increase tasks slightly.
>
> That's wired. I don't think the number of network memory buffers have
> anything to do with the task amount.
>
> Let me try to clarify a few things.
>
> Please be aware that, how many tasks a Flink job has, and how many
> slots a Flink cluster has, are two different things.
> - The number of tasks are decided by your job's parallelism and
> topology. E.g., if your job graph have 3 vertices A, B and C, with
> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
> tasks.
> - The number of slots are decided by number of TMs and slots-per-TM.
> - For streaming jobs, you have to make sure the number of slots is
> enough 

Re: 向flink push代码

2020-05-27 Thread 宇张
找打了教程了


On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:

> hi,
> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
>


回复: flink 如何自定义connector

2020-05-27 Thread 111
Hi,
随便开一个jdbc connector之类的,模仿造一个就行:
1 需要有Service loader的描述符文件:resources下面需要有META-INFO.services,里面有TableFactory的实现类声明
2 创建对应的TableFactory实现类,根据source sink实现不同的接口,返回对应的TableSource或者TableSink
3 如果是TableSource,根据剪枝、lookup等特性实现不同的接口
4 如果是TableSink,根据upsert、append、retract实现不同的接口
总之,开一个connector源码,抄一下就行。
Best,
Xinghalo

向flink push代码

2020-05-27 Thread 宇张
hi,
请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。


Re: flink 如何自定义connector

2020-05-27 Thread Peihui He
hello


 现在已经定义了一个tablesource,可以通过  batchEnv.registerTableSource 这个注册
并查询数据,但是如何在sqlgateway 中配置呢?

Leonard Xu  于2020年5月28日周四 上午9:32写道:

> Hi,
> 可以参考现有的connector,如hbase,jdbc,结合[1]实现自定义connector。
>
>
> 祝好,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
> >
>
>
> > 在 2020年5月28日,09:16,Peihui He  写道:
> >
> > hello
> >
> >请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
> > gateway,使得可以执行sql的操作呢?
> >
> >
> > best wish
>
>


Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 Thread wangweigu...@stevegame.cn

确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink lib下,就可以访问CDH 
yarn,提交作业!

目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on yarn是没问题,任务运行也没问题,还可以使用Flink on hive!

flink-shaded-hadoop-2-uber-2.6.5-10.0.jar



 
发件人: 111
发送时间: 2020-05-28 09:13
收件人: user-zh@flink.apache.org
主题: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端
Hi,
一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
如果是per job模式:直接使用flink run即可。
best,
Xinghalo


Re: flink 如何自定义connector

2020-05-27 Thread Leonard Xu
Hi,
可以参考现有的connector,如hbase,jdbc,结合[1]实现自定义connector。


祝好,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
 



> 在 2020年5月28日,09:16,Peihui He  写道:
> 
> hello
> 
>请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
> gateway,使得可以执行sql的操作呢?
> 
> 
> best wish



flink 如何自定义connector

2020-05-27 Thread Peihui He
hello

请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
gateway,使得可以执行sql的操作呢?


best wish


回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 Thread 111
Hi,
一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
如果是per job模式:直接使用flink run即可。
best,
Xinghalo

flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 Thread 王飞
flink 1.9  1.10 在cdh上怎么搭建一个客户端。  我需要一个客户端启动flink on yan.  1.7版本 是正常的。 但是1.9 
和1.10 启动了on yarn 任务。我的环境是cdh hadoop。 谢谢回答



Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-27 Thread Guowei Ma
Hi,
I think the StreamingFileSink could not support Azure currently.
You could find more detailed info from here[1].

[1] https://issues.apache.org/jira/browse/FLINK-17444
Best,
Guowei


Israel Ekpo  于2020年5月28日周四 上午6:04写道:

> You can assign the task to me and I will like to collaborate with someone
> to fix it.
>
> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo  wrote:
>
>> Some users are running into issues when using Azure Blob Storage for the
>> StreamFileSink
>>
>> https://issues.apache.org/jira/browse/FLINK-17989
>>
>> The issue is because certain packages are relocated in the POM file and
>> some classes are dropped in the final shaded jar
>>
>> I have attempted to comment out the relocated and recompile the source
>> but I keep hitting roadblocks of other relocation and filtration each time
>> I update a specific pom file
>>
>> How can this be addressed so that these users can be unblocked? Why are
>> the classes filtered out? What is the workaround? I can work on the patch
>> if I have some guidance.
>>
>> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
>> issue but I am yet to confirm
>>
>> Thanks.
>>
>>
>>
>


Executing a controllable benchmark in Flink

2020-05-27 Thread Felipe Gutierrez
Hi,

I am trying to benchmark a stream application in Flink. So, I am using
the source Function that reads events from the NYC Taxi Rides
(http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
control the emission with System.nanoTime(). I am not using
Thread.sleep because Java does not guarantee the time that the thread
will be awakened.

public void busySleep() {
final long startTime = System.nanoTime();
while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
}

So, when I wait for 1 nanoseconds I will get a workload of 100K
rec/sec. When I wait for 2000 nanoseconds I will get a workload of
500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.

The problem that I am facing is that when I set the workload for 1M
rec/sec it seems that it is not generating at this rate. I guess it is
because it is consuming more time reading the TaxiRide file, or doing
IO operations, Or maybe it is some Java limitation.
If I use some message broker it will end up adding one more middleware
to have read/write IO operations and I guess it will be worst.
What do you recommend to do a controllable benchmark for stream processing?

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-27 Thread Israel Ekpo
You can assign the task to me and I will like to collaborate with someone
to fix it.

On Wed, May 27, 2020 at 5:52 PM Israel Ekpo  wrote:

> Some users are running into issues when using Azure Blob Storage for the
> StreamFileSink
>
> https://issues.apache.org/jira/browse/FLINK-17989
>
> The issue is because certain packages are relocated in the POM file and
> some classes are dropped in the final shaded jar
>
> I have attempted to comment out the relocated and recompile the source but
> I keep hitting roadblocks of other relocation and filtration each time I
> update a specific pom file
>
> How can this be addressed so that these users can be unblocked? Why are
> the classes filtered out? What is the workaround? I can work on the patch
> if I have some guidance.
>
> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
> issue but I am yet to confirm
>
> Thanks.
>
>
>


[DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-27 Thread Israel Ekpo
Some users are running into issues when using Azure Blob Storage for the
StreamFileSink

https://issues.apache.org/jira/browse/FLINK-17989

The issue is because certain packages are relocated in the POM file and
some classes are dropped in the final shaded jar

I have attempted to comment out the relocated and recompile the source but
I keep hitting roadblocks of other relocation and filtration each time I
update a specific pom file

How can this be addressed so that these users can be unblocked? Why are the
classes filtered out? What is the workaround? I can work on the patch if I
have some guidance.

This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
issue but I am yet to confirm

Thanks.


Installing Ververica, unable to write to file system

2020-05-27 Thread Corrigan, Charlie
Hello, I’m trying to install Ververica (community edition for a simple poc 
deploy) via helm using these 
directions, but the 
pod is failing with the following error:

```
org.springframework.context.ApplicationContextException: Unable to start web 
server; nested exception is 
org.springframework.boot.web.server.WebServerException: Unable to create 
tempDir. java.io.tmpdir is set to /tmp
```

By default, our file system is immutable in k8s. Usually for this error, we’d 
mount an emptyDir volume. I’ve tried to do that in ververica’s values.yaml 
file, but I might be configuring it incorrectly. Here is the relevant portion 
of the values.yaml. I can include the entire file if it’s helpful. Any advice 
on how to alter these values or proceed with the ververica installation with a 
read only file system?

volumes:
  - name: tmp
emptyDir: {}
##
## Container configuration for the appmanager component
##
appmanager:
  image:
repository: registry.ververica.com/v2.1/vvp-appmanager
tag: 2.1.0
pullPolicy: Always
volumeMounts:
  - mountPath: /tmp
name: tmp
  resources:
limits:
  cpu: 1000m
  memory: 1Gi
requests:
  cpu: 250m
  memory: 1Gi

  artifactFetcherTag: 2.1.0



Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-27 Thread Aizhamal Nurmamat kyzy
Thank you all for attending today's session! Here is the YT recording:
https://www.youtube.com/watch?v=ZCV9aRDd30U
And link to the slides:
https://github.com/aijamalnk/beam-learning-month/blob/master/Unlocking%20the%20Power%20of%20Apache%20Beam%20with%20Apache%20Flink.pdf

On Tue, May 26, 2020 at 8:32 AM Aizhamal Nurmamat kyzy 
wrote:

> Hi all,
>
> Please join our webinar this Wednesday at 10am PST/5:00pm GMT/1:00pm EST
> where Max Michels - PMC member for Apache Beam and Apache Flink, will
> deliver a talk about leveraging Apache Beam for large-scale stream and
> batch analytics with Apache Flink.
>
> You can register via this link:
> https://learn.xnextcon.com/event/eventdetails/W20052710
>
> Here is the short description of the talk:
> ---
> Apache Beam is a framework for writing stream and batch processing
> pipelines using multiple languages such as Java, Python, SQL, or Go. Apache
> Beam does not come with an execution engine of its own. Instead, it defers
> the execution to its Runners which translate Beam pipelines for any
> supported execution engine. Thus, users have complete control over the
> language and the execution engine they use, without having to rewrite their
> code.
> In this talk, we will look at running Apache Beam pipelines with Apache
> Flink. We will explain the concepts behind Apache Beams portability
> framework for multi-language support, and then show how to get started
> running Java, Python, and SQL pipelines.
> 
> Links to the slides and recordings of this and previous webinars you can
> find here: https://github.com/aijamalnk/beam-learning-month
>
> Hope y'all are safe,
> Aizhamal
>


Re: Stateful functions Harness

2020-05-27 Thread Boris Lublinsky
Thanks Seth
Will take a look.


> On May 27, 2020, at 3:15 PM, Seth Wiesman  wrote:
> 
> Hi Boris, 
> 
> Example usage of flink sources and sink is available in the documentation[1]. 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/flink-connectors.html
>  
> 
> On Wed, May 27, 2020 at 1:08 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Thats not exactly the usage question that I am asking
> When I am writing IO module I have to write Ingress and Egress spec.
> You have an example for Kafka, which looks like
> 
> def getIngressSpec: IngressSpec[GreetRequest] =
>   KafkaIngressBuilder.forIdentifier(GREETING_INGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withTopic("names")
> .withDeserializer(classOf[GreetKafkaDeserializer])
> .withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
> .build
> 
> def getEgressSpec: EgressSpec[GreetResponse] =
>   KafkaEgressBuilder.forIdentifier(GREETING_EGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withSerializer(classOf[GreetKafkaSerializer])
> .build
> How is it going to look if I am using SourceSinkModule?
> Do I just specify stream names? Something else?
> 
> 
> 
> 
> 
>> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> 
>> 
>> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> I think I figured this out.
>> The project seems to be missing
>> 
>> resources 
>> /META-INF
>>  
>> /services
>>  directory, which should contain services
>> 
>> Yes, the functions / ingresses / regresses etc. are not discoverable if the 
>> service file isnt present in the classpath.
>> 
>> For the examples, if you are running it straight from the repo, should all 
>> have that service file defined and therefore readily runnable.
>> 
>> If you are creating your own application project, you'll have to add that 
>> yourself.
>> 
>> 
>> Another question:
>> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>> 
>> Class, which I think allows to use existing data streams as ingress/egress.
>> 
>> Are there any examples of its usage
>> 
>> On the Harness class, there is a withFlinkSourceFunction method in which you 
>> can directly add a Flink source function as the ingress.
>> 
>> If you want to use that directly in a normal application (not just execution 
>> in IDE with the Harness), you can define your ingesses/egresses by binding 
>> SourceFunctionSpec / SinkFunctionSpec.
>> Please see how they are being used in the Harness class for examples.
>> 
>> Gordon
>> 
>> 
>> 
>>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>> 
>>> Hi,
>>> 
>>> The example is working fine on my side (also using IntelliJ).
>>> This could most likely be a problem with your project setup in the IDE, 
>>> where the classpath isn't setup correctly.
>>> 
>>> What do you see when you right click on the statefun-flink-harness-example 
>>> directory (in the IDE) --> Open Module Settings, and then under the 
>>> "Sources" / "Dependencies" tab?
>>> Usually this should all be automatically setup correctly when importing the 
>>> project.
>>> 
>>> Gordon
>>> 
>>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> The project 
>>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>>  
>>> 
>>> Does not work in Intellij.
>>> 
>>> The problem is that when running in Intellij, method public static Modules 
>>> loadFromClassPath() {
>>> Does not pick up classes, which are local in Intellij
>>> 
>>> Any work arounds?
>>> 
>>> 
>>> 
>>> 
 On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai >>> > wrote:
 
 Hi,
 
 Sorry, I need to correct my comment on using the Kafka ingress / egress 
 with the Harness.
 
 That is actually doable, by adding an extra dependency to 
 `statefun-flink-distribution` in your Harness program.
 That pulls in all the other required dependencies required by the Kafka 
 ingress / egress, such as the source / sink providers and Flink Kafka 
 connectors.
 
 Cheers,
 Gordon
 
 On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai >>> > wrote:
 Are you getting an exception from running the Harness?
 The Harness should already have the required configurations, such as the 
 

Re: Stateful functions Harness

2020-05-27 Thread Seth Wiesman
Hi Boris,

Example usage of flink sources and sink is available in the
documentation[1].

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/flink-connectors.html

On Wed, May 27, 2020 at 1:08 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Thats not exactly the usage question that I am asking
> When I am writing IO module I have to write Ingress and Egress spec.
> You have an example for Kafka, which looks like
>
> def getIngressSpec: IngressSpec[GreetRequest] =
>   KafkaIngressBuilder.forIdentifier(GREETING_INGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withTopic("names")
> .withDeserializer(classOf[GreetKafkaDeserializer])
> .withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
> .build
>
> def getEgressSpec: EgressSpec[GreetResponse] =
>   KafkaEgressBuilder.forIdentifier(GREETING_EGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withSerializer(classOf[GreetKafkaSerializer])
> .build
>
> How is it going to look if I am using SourceSinkModule?
> Do I just specify stream names? Something else?
>
>
>
>
>
> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>
>
> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> I think I figured this out.
>> The project seems to be missing
>>
>> resources
>> 
>> /META-INF
>> 
>> /services directory, which should contain services
>>
>
> Yes, the functions / ingresses / regresses etc. are not discoverable if
> the service file isnt present in the classpath.
>
> For the examples, if you are running it straight from the repo, should all
> have that service file defined and therefore readily runnable.
>
> If you are creating your own application project, you'll have to add that
> yourself.
>
>
>> Another question:
>> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>>
>> Class, which I think allows to use existing data streams as
>> ingress/egress.
>>
>> Are there any examples of its usage
>>
>
> On the Harness class, there is a withFlinkSourceFunction method in which
> you can directly add a Flink source function as the ingress.
>
> If you want to use that directly in a normal application (not just
> execution in IDE with the Harness), you can define your ingesses/egresses
> by binding SourceFunctionSpec / SinkFunctionSpec.
> Please see how they are being used in the Harness class for examples.
>
> Gordon
>
>
>>
>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> The example is working fine on my side (also using IntelliJ).
>> This could most likely be a problem with your project setup in the IDE,
>> where the classpath isn't setup correctly.
>>
>> What do you see when you right click on the
>> statefun-flink-harness-example directory (in the IDE) --> Open Module
>> Settings, and then under the "Sources" / "Dependencies" tab?
>> Usually this should all be automatically setup correctly when importing
>> the project.
>>
>> Gordon
>>
>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> The project
>>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>> Does not work in Intellij.
>>>
>>> The problem is that when running in Intellij, method public static Modules
>>> loadFromClassPath() {
>>> Does not pick up classes, which are local in Intellij
>>>
>>> Any work arounds?
>>>
>>>
>>>
>>>
>>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
>>> Hi,
>>>
>>> Sorry, I need to correct my comment on using the Kafka ingress / egress
>>> with the Harness.
>>>
>>> That is actually doable, by adding an extra dependency to
>>> `statefun-flink-distribution` in your Harness program.
>>> That pulls in all the other required dependencies required by the Kafka
>>> ingress / egress, such as the source / sink providers and Flink Kafka
>>> connectors.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
 Are you getting an exception from running the Harness?
 The Harness should already have the required configurations, such as
 the parent first classloading config.

 Otherwise, if you would like to add your own configuration, use the
 `withConfiguration` method on the `Harness` class.

 On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
 boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional
> required config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, 

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-27 Thread Hailu, Andreas
Hi Chesney, apologies for not getting back to you sooner here. So I did what 
you suggested - I downloaded a few files from my jobmanager.archive.fs.dir HDFS 
directory to a locally available directory named 
/local/scratch/hailua_p2epdlsuat/historyserver/archived/. I then changed my 
historyserver.archive.fs.dir to 
file:///local/scratch/hailua_p2epdlsuat/historyserver/archived/ and that seemed 
to work. I'm able to see the history of the applications I downloaded. So this 
points to a problem with sourcing the history from HDFS.

Do you think this could be classpath related? This is what we use for our 
HADOOP_CLASSPATH var:
/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/lib/*:/gns/software/ep/da/dataproc/dataproc-prod/lakeRmProxy.jar:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/bin::/gns/mw/dbclient/postgres/jdbc/pg-jdbc-9.3.v01/postgresql-9.3-1100-jdbc4.jar

You can see we have references to Hadoop mapred/yarn/hdfs libs in there.

// ah

From: Chesnay Schepler 
Sent: Sunday, May 3, 2020 6:00 PM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

yes, exactly; I want to rule out that (somehow) HDFS is the problem.

I couldn't reproduce the issue locally myself so far.

On 01/05/2020 22:31, Hailu, Andreas wrote:
Hi Chesnay, yes - they were created using Flink 1.9.1 as we've only just 
started to archive them in the past couple weeks. Could you clarify on how you 
want to try local filesystem archives? As in changing jobmanager.archive.fs.dir 
and historyserver.web.tmpdir to the same local directory?

// ah

From: Chesnay Schepler 
Sent: Wednesday, April 29, 2020 8:26 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

hmm...let's see if I can reproduce the issue locally.

Are the archives from the same version the history server runs on? (Which I 
supposed would be 1.9.1?)

Just for the sake of narrowing things down, it would also be interesting to 
check if it works with the archives residing in the local filesystem.

On 27/04/2020 18:35, Hailu, Andreas wrote:
bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/
total 8
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43 
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22 
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don't see cache directories from my 
attempts today, which is interesting. Looking a little deeper into them:

bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
total 1756
drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs
bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs
total 0
-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS - I've included some in my initial 
mail, but here they are again just for reference:
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...


// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 10:28 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

If historyserver.web.tmpdir is not set then java.io.tmpdir is used, so that 
should be fine.

What are the contents of /local/scratch/flink_historyserver_tmpdir?
I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:
My machine's /tmp directory is not large enough to support the archived files, 
so I changed my java.io.tmpdir to be in some other location which is 
significantly larger. I hadn't set anything for historyserver.web.tmpdir, so I 
suspect it was still pointing at /tmp. I just tried setting 

Re: Tumbling windows - increasing checkpoint size over time

2020-05-27 Thread Wissman, Matt
Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann 
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma 
Cc: "Wissman, Matt" , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma 
mailto:guowei@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt


Re: Stateful functions Harness

2020-05-27 Thread Boris Lublinsky
Thats not exactly the usage question that I am asking
When I am writing IO module I have to write Ingress and Egress spec.
You have an example for Kafka, which looks like

def getIngressSpec: IngressSpec[GreetRequest] =
  KafkaIngressBuilder.forIdentifier(GREETING_INGRESS_ID)
.withKafkaAddress(kafkaAddress)
.withTopic("names")
.withDeserializer(classOf[GreetKafkaDeserializer])
.withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
.build

def getEgressSpec: EgressSpec[GreetResponse] =
  KafkaEgressBuilder.forIdentifier(GREETING_EGRESS_ID)
.withKafkaAddress(kafkaAddress)
.withSerializer(classOf[GreetKafkaSerializer])
.build
How is it going to look if I am using SourceSinkModule?
Do I just specify stream names? Something else?





> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> 
> 
> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky  > wrote:
> I think I figured this out.
> The project seems to be missing
> 
> resources 
> /META-INF
>  
> /services
>  directory, which should contain services
> 
> Yes, the functions / ingresses / regresses etc. are not discoverable if the 
> service file isnt present in the classpath.
> 
> For the examples, if you are running it straight from the repo, should all 
> have that service file defined and therefore readily runnable.
> 
> If you are creating your own application project, you'll have to add that 
> yourself.
> 
> 
> Another question:
> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
> 
> Class, which I think allows to use existing data streams as ingress/egress.
> 
> Are there any examples of its usage
> 
> On the Harness class, there is a withFlinkSourceFunction method in which you 
> can directly add a Flink source function as the ingress.
> 
> If you want to use that directly in a normal application (not just execution 
> in IDE with the Harness), you can define your ingesses/egresses by binding 
> SourceFunctionSpec / SinkFunctionSpec.
> Please see how they are being used in the Harness class for examples.
> 
> Gordon
> 
> 
> 
>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> Hi,
>> 
>> The example is working fine on my side (also using IntelliJ).
>> This could most likely be a problem with your project setup in the IDE, 
>> where the classpath isn't setup correctly.
>> 
>> What do you see when you right click on the statefun-flink-harness-example 
>> directory (in the IDE) --> Open Module Settings, and then under the 
>> "Sources" / "Dependencies" tab?
>> Usually this should all be automatically setup correctly when importing the 
>> project.
>> 
>> Gordon
>> 
>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> The project 
>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>  
>> 
>> Does not work in Intellij.
>> 
>> The problem is that when running in Intellij, method public static Modules 
>> loadFromClassPath() {
>> Does not pick up classes, which are local in Intellij
>> 
>> Any work arounds?
>> 
>> 
>> 
>> 
>>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>> 
>>> Hi,
>>> 
>>> Sorry, I need to correct my comment on using the Kafka ingress / egress 
>>> with the Harness.
>>> 
>>> That is actually doable, by adding an extra dependency to 
>>> `statefun-flink-distribution` in your Harness program.
>>> That pulls in all the other required dependencies required by the Kafka 
>>> ingress / egress, such as the source / sink providers and Flink Kafka 
>>> connectors.
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai >> > wrote:
>>> Are you getting an exception from running the Harness?
>>> The Harness should already have the required configurations, such as the 
>>> parent first classloading config.
>>> 
>>> Otherwise, if you would like to add your own configuration, use the 
>>> `withConfiguration` method on the `Harness` class.
>>> 
>>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> Also, where do I put flint-conf.yaml in Idea to add additional required 
>>> config parameter:
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>> 
>>> 
 On May 21, 2020, at 12:22 PM, Boris Lublinsky 
 mailto:boris.lublin...@lightbend.com>> 
 wrote:
 
 Hi, 
 I am trying to run 
 

Re: Stateful functions Harness

2020-05-27 Thread Boris Lublinsky
Ok, so the bug in the examples is an absence of resources. Having classes in 
the classpath is not sufficient
Modules.java is using ServiceLoader, which is setting private static final 
String PREFIX = "META-INF/services/"
So all the modules have to be listed in the resource files



> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> 
> 
> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky  > wrote:
> I think I figured this out.
> The project seems to be missing
> 
> resources 
> /META-INF
>  
> /services
>  directory, which should contain services
> 
> Yes, the functions / ingresses / regresses etc. are not discoverable if the 
> service file isnt present in the classpath.
> 
> For the examples, if you are running it straight from the repo, should all 
> have that service file defined and therefore readily runnable.
> 
> If you are creating your own application project, you'll have to add that 
> yourself.
> 
> 
> Another question:
> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
> 
> Class, which I think allows to use existing data streams as ingress/egress.
> 
> Are there any examples of its usage
> 
> On the Harness class, there is a withFlinkSourceFunction method in which you 
> can directly add a Flink source function as the ingress.
> 
> If you want to use that directly in a normal application (not just execution 
> in IDE with the Harness), you can define your ingesses/egresses by binding 
> SourceFunctionSpec / SinkFunctionSpec.
> Please see how they are being used in the Harness class for examples.
> 
> Gordon
> 
> 
> 
>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> Hi,
>> 
>> The example is working fine on my side (also using IntelliJ).
>> This could most likely be a problem with your project setup in the IDE, 
>> where the classpath isn't setup correctly.
>> 
>> What do you see when you right click on the statefun-flink-harness-example 
>> directory (in the IDE) --> Open Module Settings, and then under the 
>> "Sources" / "Dependencies" tab?
>> Usually this should all be automatically setup correctly when importing the 
>> project.
>> 
>> Gordon
>> 
>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> The project 
>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>  
>> 
>> Does not work in Intellij.
>> 
>> The problem is that when running in Intellij, method public static Modules 
>> loadFromClassPath() {
>> Does not pick up classes, which are local in Intellij
>> 
>> Any work arounds?
>> 
>> 
>> 
>> 
>>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>> 
>>> Hi,
>>> 
>>> Sorry, I need to correct my comment on using the Kafka ingress / egress 
>>> with the Harness.
>>> 
>>> That is actually doable, by adding an extra dependency to 
>>> `statefun-flink-distribution` in your Harness program.
>>> That pulls in all the other required dependencies required by the Kafka 
>>> ingress / egress, such as the source / sink providers and Flink Kafka 
>>> connectors.
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai >> > wrote:
>>> Are you getting an exception from running the Harness?
>>> The Harness should already have the required configurations, such as the 
>>> parent first classloading config.
>>> 
>>> Otherwise, if you would like to add your own configuration, use the 
>>> `withConfiguration` method on the `Harness` class.
>>> 
>>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> Also, where do I put flint-conf.yaml in Idea to add additional required 
>>> config parameter:
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>> 
>>> 
 On May 21, 2020, at 12:22 PM, Boris Lublinsky 
 mailto:boris.lublin...@lightbend.com>> 
 wrote:
 
 Hi, 
 I am trying to run 
 https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
  
 
  locally
 using 
 https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
  
 
  
 And have several questions.
 1. It seems fairly 

Age old stop vs cancel debate

2020-05-27 Thread Senthil Kumar
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar


Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-27 Thread Slotterback, Chris
Aljoscha,
Maybe “lazy” isn’t the right term haha it’s my interpretation that during mark 
and sweep of the default GC, memory from older windows wasn’t being fully 
marked for collection. Since switching to G1, collection seems to be much more 
aggressive, and whenever the young generation memory exceeds the configured %, 
it reclaims all unreferenced state.

Mitch,
I am running 1.9 at the moment, with plans to upgrade to 1.10 at some point.

It looks like in that tm script that if any other jvm options are set, it won’t 
append the UseG1GC flag? We had more config so it wasn’t G1 by default. A quick 
way to verify the taskmanagers are actually using G1 is checking the flink 
taskmanager garbage collection metrics for G1_Young_Generation and 
G1_Old_Generation. If it isn’t being set, you can always append the G1 flag 
manually to java ops in the flink-conf file each host will use to start.

Chris

From: Mitch Lloyd 
Date: Wednesday, May 27, 2020 at 12:00 PM
To: Aljoscha Krettek 
Cc: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Memory growth from TimeWindows

Chris,

What version of Flink are you using? I also have an issue with slow but 
continual memory growth in a windowing function but it seems like the 
taskmanager.sh script I'm using already has the -XX+UseG1GC flag set: 
https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/taskmanager.sh#L43

On Mon, May 25, 2020 at 3:31 AM Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Just to double check: the issue was resolved by using a different GC?
Because the default GC was too "lazy". ;-)

Best,
Aljoscha

On 21.05.20 18:09, Slotterback, Chris wrote:
> For those who are interested or googling the mail archives in 8 months, the 
> issue was garbage collection related.
>
> The default 1.8 jvm garbage collector (parallel gc) was being lazy in its 
> marking and collection phases and letting the heap build to a level that was 
> causing memory exceptions and stalled tms. This app has a lot of state, and 
> memory usage well above 10GB at times. The solution was moving to the G1 
> collector which is very aggressive in its young generation collection by 
> default, at the cost of some cpu usage and requires some tuning, but keeps 
> the memory levels much more stable.
>
> On 5/20/20, 9:05 AM, "Slotterback, Chris" 
> mailto:chris_slotterb...@comcast.com>> wrote:
>
>  What I've noticed is that heap memory ends up growing linearly with time 
> indefinitely (past 24 hours) until it hits the roof of the allocated heap for 
> the task manager, which leads me to believe I am leaking somewhere. All of my 
> windows have an allowed lateness of 5 minutes, and my watermarks are pulled 
> from time embedded in the records using 
> BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and 
> SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use 
> ProcessJoinFunctions.
>
>  I expect this app to use a significant amount of memory at scale due to 
> the 288 5-minute intervals in 24 hours, and records being put in all 288 
> window states, and as the application runs for 24 hours memory would increase 
> as all 288(*unique key) windows build with incoming records, but then after 
> 24 hours the memory should stop growing, or at least grow at a different rate?
>
>  Also of note, we are using a FsStateBackend configuration, and plan to 
> move to RocksDBStateBackend, but from what I can tell, this would only reduce 
> memory and delay hitting the heap memory capacity, not stall it forever?
>
>  Thanks
>  Chris
>
>
>  On 5/18/20, 7:29 AM, "Aljoscha Krettek" 
> mailto:aljos...@apache.org>> wrote:
>
>  On 15.05.20 15:17, Slotterback, Chris wrote:
>  > My understanding is that while all these windows build their 
> memory state, I can expect heap memory to grow for the 24 hour length of the 
> SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
> expire and release back to the JVM. What is actually happening is when a 
> constant data source feeds the stream, the heap memory profile grows linearly 
> past the 24 hour mark. Could this be a result of a misunderstanding of how 
> the window’s memory states are kept, or is my assumption correct, and it is 
> more likely I have a leak somewhere?
>
>  Will memory keep growing indefinitely? That would indicate a bug? 
> What
>  sort of lateness/watermark settings do you have? What window 
> function do
>  you use? ProcessWindowFunction, or sth that aggregates?
>
>  Side note: with sliding windows of 24h/5min you will have a "write
>  amplification" of 24*60/5=288, each record will be in 288 windows, 
> which
>  will each be kept in separate 

Re: Memory issue in Flink 1.10

2020-05-27 Thread Andrey Zagrebin
Hi Steve,

RocksDB does not contribute to the JVM direct memory. RocksDB off-heap
memory consumption is part of managed memory [1].

You got `OutOfMemoryError: Direct buffer memory` which is related to
the JVM direct memory, also off-heap but managed by JVM.
The JVM direct memory limit depends on the corresponding JVM argument of
Flink process: *-XX:MaxDirectMemorySize* [2].
You can increase the direct memory limit by changing this
configuration option: taskmanager.memory.task.off-heap.size [3].

Judging the stack trace you provided, the HDFS dependency of RocksDB state
backend requires more JVM direct memory for remote communication to restore
state.
This can also mean that there are other consumers of direct memory in your
application but it was the HDFS which hit the limit.
See also the troubleshooting guide for `OutOfMemoryError: Direct buffer
memory` [4].

Notice that Flink network buffers also use the JVM direct memory but Flink
makes sure that they do not exceed their limit [5].

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#managed-memory
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_trouble.html#outofmemoryerror-direct-buffer-memory
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

On Wed, May 27, 2020 at 6:30 PM Steven Nelson 
wrote:

> We recently migrated to Flink 1.10, but are experiencing some issues with
> memory.
>
> Our cluster is:
> 1) Running inside of Kubernetes
> 2) Running in HA mode
> 3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
> 4) Using RocksDB for checkpointing
> 5) Running on m5d.4xlarge EC2 instances with 64gb of ram
> 6) The taskmanager pods do not have a memory limit set on them
> 7) We set taskmanager.memory.process.size to 48g
>
> We get the following error:
> 2020-05-27 10:12:34
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:191)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:255)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .initializeStateAndOpen(StreamTask.java:989)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$0(StreamTask.java:453)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:448)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32)
> from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:304)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:131)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
> at org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
> .java:336)
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
> .createKeyedStateBackend(RocksDBStateBackend.java:548)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:288)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:694)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at 

Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 Thread macia kk
Hi 各位大佬

   .field("event_time", TIMESTAMP()).rowtime(
  new Rowtime()
  .timestampsFromField("maxwell_ts")
  .watermarksPeriodicBounded(6)
)


我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:

Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
physical type


有类似

event_time as to_timestamp(maxwell_ts)


这么的操作码?


Re: Stateful functions Harness

2020-05-27 Thread Tzu-Li (Gordon) Tai
On Thu, May 28, 2020, 12:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> I think I figured this out.
> The project seems to be missing
>
> resources
> 
> /META-INF
> 
> /services directory, which should contain services
>

Yes, the functions / ingresses / regresses etc. are not discoverable if the
service file isnt present in the classpath.

For the examples, if you are running it straight from the repo, should all
have that service file defined and therefore readily runnable.

If you are creating your own application project, you'll have to add that
yourself.


> Another question:
> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>
> Class, which I think allows to use existing data streams as ingress/egress.
>
> Are there any examples of its usage
>

On the Harness class, there is a withFlinkSourceFunction method in which
you can directly add a Flink source function as the ingress.

If you want to use that directly in a normal application (not just
execution in IDE with the Harness), you can define your ingesses/egresses
by binding SourceFunctionSpec / SinkFunctionSpec.
Please see how they are being used in the Harness class for examples.

Gordon


>
> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi,
>
> The example is working fine on my side (also using IntelliJ).
> This could most likely be a problem with your project setup in the IDE,
> where the classpath isn't setup correctly.
>
> What do you see when you right click on the statefun-flink-harness-example
> directory (in the IDE) --> Open Module Settings, and then under the
> "Sources" / "Dependencies" tab?
> Usually this should all be automatically setup correctly when importing
> the project.
>
> Gordon
>
> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> The project
>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>> Does not work in Intellij.
>>
>> The problem is that when running in Intellij, method public static Modules
>> loadFromClassPath() {
>> Does not pick up classes, which are local in Intellij
>>
>> Any work arounds?
>>
>>
>>
>>
>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> Sorry, I need to correct my comment on using the Kafka ingress / egress
>> with the Harness.
>>
>> That is actually doable, by adding an extra dependency to
>> `statefun-flink-distribution` in your Harness program.
>> That pulls in all the other required dependencies required by the Kafka
>> ingress / egress, such as the source / sink providers and Flink Kafka
>> connectors.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Are you getting an exception from running the Harness?
>>> The Harness should already have the required configurations, such as the
>>> parent first classloading config.
>>>
>>> Otherwise, if you would like to add your own configuration, use the
>>> `withConfiguration` method on the `Harness` class.
>>>
>>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
 Also, where do I put flint-conf.yaml in Idea to add additional required
 config parameter:

 classloader.parent-first-patterns.additional: 
 org.apache.flink.statefun;org.apache.kafka;com.google.protobuf



 On May 21, 2020, at 12:22 PM, Boris Lublinsky <
 boris.lublin...@lightbend.com> wrote:

 Hi,
 I am trying to run
 https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
  locally
 using
 https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example

 And have several questions.
 1. It seems fairly straightforward to use it with in memory message
 generators, but I can’t figure out how to add Kafka ingress/Egress so that
 I can use it with Kafk
 2. GreetingModule already creates StatefulFunctionUniverse  and so does
 Harness. Is there a way to short circuit it and have Harness get
 StatefulFunctionUniverse directly
 3. Is there an example on how to write Flink main for stageful function?
 4. Is there an example anywhere on how to run such examples in the IDE
 with Kafka?
 5 There is a great stateful functions example
 https://github.com/ververica/flink-statefun-workshop, but its readme
 does not really describe implementation and neither does this article,
 referencing it
 https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
 Is there anything that describes this implementation?



>>
>


Re: Stateful functions Harness

2020-05-27 Thread Boris Lublinsky
I think I figured this out.
The project seems to be missing

resources 
/META-INF
 
/services
 directory, which should contain services

Another question:
I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule

Class, which I think allows to use existing data streams as ingress/egress.

Are there any examples of its usage


> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> The example is working fine on my side (also using IntelliJ).
> This could most likely be a problem with your project setup in the IDE, where 
> the classpath isn't setup correctly.
> 
> What do you see when you right click on the statefun-flink-harness-example 
> directory (in the IDE) --> Open Module Settings, and then under the "Sources" 
> / "Dependencies" tab?
> Usually this should all be automatically setup correctly when importing the 
> project.
> 
> Gordon
> 
> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> The project 
> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>  
> 
> Does not work in Intellij.
> 
> The problem is that when running in Intellij, method public static Modules 
> loadFromClassPath() {
> Does not pick up classes, which are local in Intellij
> 
> Any work arounds?
> 
> 
> 
> 
>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> Hi,
>> 
>> Sorry, I need to correct my comment on using the Kafka ingress / egress with 
>> the Harness.
>> 
>> That is actually doable, by adding an extra dependency to 
>> `statefun-flink-distribution` in your Harness program.
>> That pulls in all the other required dependencies required by the Kafka 
>> ingress / egress, such as the source / sink providers and Flink Kafka 
>> connectors.
>> 
>> Cheers,
>> Gordon
>> 
>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai > > wrote:
>> Are you getting an exception from running the Harness?
>> The Harness should already have the required configurations, such as the 
>> parent first classloading config.
>> 
>> Otherwise, if you would like to add your own configuration, use the 
>> `withConfiguration` method on the `Harness` class.
>> 
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> Also, where do I put flint-conf.yaml in Idea to add additional required 
>> config parameter:
>> classloader.parent-first-patterns.additional: 
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>> 
>> 
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> 
>>> Hi, 
>>> I am trying to run 
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  
>>> 
>>>  locally
>>> using 
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>  
>>> 
>>>  
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message 
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that 
>>> I can use it with Kafk
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does 
>>> Harness. Is there a way to short circuit it and have Harness get 
>>> StatefulFunctionUniverse directly
>>> 3. Is there an example on how to write Flink main for stageful function?
>>> 4. Is there an example anywhere on how to run such examples in the IDE with 
>>> Kafka?
>>> 5 There is a great stateful functions example 
>>> https://github.com/ververica/flink-statefun-workshop 
>>> , but its readme does 
>>> not really describe implementation and neither does this article, 
>>> referencing it 
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
>>> . Is 
>>> there anything that describes this implementation?
>>> 
>> 
> 



Re: Stateful functions Harness

2020-05-27 Thread Tzu-Li (Gordon) Tai
Hi,

The example is working fine on my side (also using IntelliJ).
This could most likely be a problem with your project setup in the IDE,
where the classpath isn't setup correctly.

What do you see when you right click on the statefun-flink-harness-example
directory (in the IDE) --> Open Module Settings, and then under the
"Sources" / "Dependencies" tab?
Usually this should all be automatically setup correctly when importing the
project.

Gordon

On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> The project
> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
> Does not work in Intellij.
>
> The problem is that when running in Intellij, method public static Modules
> loadFromClassPath() {
> Does not pick up classes, which are local in Intellij
>
> Any work arounds?
>
>
>
>
> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi,
>
> Sorry, I need to correct my comment on using the Kafka ingress / egress
> with the Harness.
>
> That is actually doable, by adding an extra dependency to
> `statefun-flink-distribution` in your Harness program.
> That pulls in all the other required dependencies required by the Kafka
> ingress / egress, such as the source / sink providers and Flink Kafka
> connectors.
>
> Cheers,
> Gordon
>
> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Are you getting an exception from running the Harness?
>> The Harness should already have the required configurations, such as the
>> parent first classloading config.
>>
>> Otherwise, if you would like to add your own configuration, use the
>> `withConfiguration` method on the `Harness` class.
>>
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>> config parameter:
>>>
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>
>>>
>>>
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>> Hi,
>>> I am trying to run
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  locally
>>> using
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>> I can use it with Kafk
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>> Harness. Is there a way to short circuit it and have Harness get
>>> StatefulFunctionUniverse directly
>>> 3. Is there an example on how to write Flink main for stageful function?
>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>> with Kafka?
>>> 5 There is a great stateful functions example
>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>> does not really describe implementation and neither does this article,
>>> referencing it
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>> Is there anything that describes this implementation?
>>>
>>>
>>>
>


Re: Flink Dashboard UI Tasks hard limit

2020-05-27 Thread Vijay Balakrishnan
Thanks so much, Xintong for guiding me through this. I looked at the Flink
logs to see the errors.
I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout:
240s to increase the number of tasks.
Now, I am able to increase the number of Tasks/ aka Task vertices.

taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.max: 4gb
taskmanager.network.memory.min: 500mb
akka.ask.timeout: 240s

On Tue, May 26, 2020 at 8:42 PM Xintong Song  wrote:

> Could you also explain how do you set the parallelism when getting this
> execution plan?
> I'm asking because this json file itself only shows the resulted execution
> plan. It is not clear to me what is not working as expected in your case.
> E.g., you set the parallelism for an operator to 10 but the execution plan
> only shows 5.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan 
> wrote:
>
>> Hi Xintong,
>> Thanks for the excellent clarification for tasks.
>>
>> I attached a sample screenshot above and din't reflect the slots used and
>> the tasks limit I was running into in that pic.
>>
>> I am attaching my Execution plan here. Please let me know how I can
>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>> i run into this bottleneck with the tasks.
>>
>> BTW - The https://flink.apache.org/visualizer/ is a great start to see
>> this.
>> TIA,
>>
>> On Sun, May 24, 2020 at 7:52 PM Xintong Song 
>> wrote:
>>
>>> Increasing network memory buffers (fraction, min, max) seems to increase
 tasks slightly.
>>>
>>> That's wired. I don't think the number of network memory buffers have
>>> anything to do with the task amount.
>>>
>>> Let me try to clarify a few things.
>>>
>>> Please be aware that, how many tasks a Flink job has, and how many slots
>>> a Flink cluster has, are two different things.
>>> - The number of tasks are decided by your job's parallelism and
>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>> tasks.
>>> - The number of slots are decided by number of TMs and slots-per-TM.
>>> - For streaming jobs, you have to make sure the number of slots is
>>> enough for executing all your tasks. The number of slots needed for
>>> executing your job is by default the max parallelism of your job graph
>>> vertices. Take the above example, you would need 4 slots, because it's the
>>> max among all the vertices' parallelisms (2, 3, 4).
>>>
>>> In your case, the screenshot shows that you job has 9621 tasks in total
>>> (not around 18000, the dark box shows total tasks while the green box shows
>>> running tasks), and 600 slots are in use (658 - 58) suggesting that the max
>>> parallelism of your job graph vertices is 600.
>>>
>>> If you want to increase the number of tasks, you should increase your
>>> job parallelism. There are several ways to do that.
>>>
>>>- In your job codes (assuming you are using DataStream API)
>>>   - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>   parallelism for all operators.
>>>   - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>   parallelism for a specific operator. (Only supported for subclasses of
>>>   `SingleOutputStreamOperator`.)
>>>- When submitting your job, use `-p ` as an argument
>>>for the `flink run` command, to set parallelism for all operators.
>>>- Set `parallelism.default` in your `flink-conf.yaml`, to set a
>>>default parallelism for your jobs. This will be used for jobs that have 
>>> not
>>>set parallelism with neither of the above methods.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan 
>>> wrote:
>>>
 Hi Xintong,
 Thx for your reply.  Increasing network memory buffers (fraction, min,
 max) seems to increase tasks slightly.

 Streaming job
 Standalone

 Vijay

 On Fri, May 22, 2020 at 2:49 AM Xintong Song 
 wrote:

> Hi Vijay,
>
> I don't think your problem is related to number of opening files. The
> parallelism of your job is decided before actually tries to open the 
> files.
> And if the OS limit for opening files is reached, you should see a job
> execution failure, instead of a success execution with a lower 
> parallelism.
>
> Could you share some more information about your use case?
>
>- What kind of job are your executing? Is it a streaming or batch
>processing job?
>- Which Flink deployment do you use? Standalone? Yarn?
>- It would be helpful if you can share the Flink logs.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
> bvija...@gmail.com> wrote:
>
>> Hi,
>> I have increased the number of slots available but the Job is not
>> using all the slots 

Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-27 Thread Mitch Lloyd
Chris,

What version of Flink are you using? I also have an issue with slow but
continual memory growth in a windowing function but it seems like the
taskmanager.sh script I'm using already has the -XX+UseG1GC flag set:
https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/taskmanager.sh#L43

On Mon, May 25, 2020 at 3:31 AM Aljoscha Krettek 
wrote:

> Just to double check: the issue was resolved by using a different GC?
> Because the default GC was too "lazy". ;-)
>
> Best,
> Aljoscha
>
> On 21.05.20 18:09, Slotterback, Chris wrote:
> > For those who are interested or googling the mail archives in 8 months,
> the issue was garbage collection related.
> >
> > The default 1.8 jvm garbage collector (parallel gc) was being lazy in
> its marking and collection phases and letting the heap build to a level
> that was causing memory exceptions and stalled tms. This app has a lot of
> state, and memory usage well above 10GB at times. The solution was moving
> to the G1 collector which is very aggressive in its young generation
> collection by default, at the cost of some cpu usage and requires some
> tuning, but keeps the memory levels much more stable.
> >
> > On 5/20/20, 9:05 AM, "Slotterback, Chris" <
> chris_slotterb...@comcast.com> wrote:
> >
> >  What I've noticed is that heap memory ends up growing linearly with
> time indefinitely (past 24 hours) until it hits the roof of the allocated
> heap for the task manager, which leads me to believe I am leaking
> somewhere. All of my windows have an allowed lateness of 5 minutes, and my
> watermarks are pulled from time embedded in the records using
> BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and
> SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use
> ProcessJoinFunctions.
> >
> >  I expect this app to use a significant amount of memory at scale
> due to the 288 5-minute intervals in 24 hours, and records being put in all
> 288 window states, and as the application runs for 24 hours memory would
> increase as all 288(*unique key) windows build with incoming records, but
> then after 24 hours the memory should stop growing, or at least grow at a
> different rate?
> >
> >  Also of note, we are using a FsStateBackend configuration, and plan
> to move to RocksDBStateBackend, but from what I can tell, this would only
> reduce memory and delay hitting the heap memory capacity, not stall it
> forever?
> >
> >  Thanks
> >  Chris
> >
> >
> >  On 5/18/20, 7:29 AM, "Aljoscha Krettek" 
> wrote:
> >
> >  On 15.05.20 15:17, Slotterback, Chris wrote:
> >  > My understanding is that while all these windows build their
> memory state, I can expect heap memory to grow for the 24 hour length of
> the SlidingEventTimeWindow, and then start to flatten as the t-24hr window
> frames expire and release back to the JVM. What is actually happening is
> when a constant data source feeds the stream, the heap memory profile grows
> linearly past the 24 hour mark. Could this be a result of a
> misunderstanding of how the window’s memory states are kept, or is my
> assumption correct, and it is more likely I have a leak somewhere?
> >
> >  Will memory keep growing indefinitely? That would indicate a
> bug? What
> >  sort of lateness/watermark settings do you have? What window
> function do
> >  you use? ProcessWindowFunction, or sth that aggregates?
> >
> >  Side note: with sliding windows of 24h/5min you will have a
> "write
> >  amplification" of 24*60/5=288, each record will be in 288
> windows, which
> >  will each be kept in separate state?
> >
> >  Best,
> >  Aljoscha
> >
> >
> >
>
>


Re: Stateful functions Harness

2020-05-27 Thread Boris Lublinsky
The project 
https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
 

Does not work in Intellij.

The problem is that when running in Intellij, method public static Modules 
loadFromClassPath() {
Does not pick up classes, which are local in Intellij

Any work arounds?




> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> Sorry, I need to correct my comment on using the Kafka ingress / egress with 
> the Harness.
> 
> That is actually doable, by adding an extra dependency to 
> `statefun-flink-distribution` in your Harness program.
> That pulls in all the other required dependencies required by the Kafka 
> ingress / egress, such as the source / sink providers and Flink Kafka 
> connectors.
> 
> Cheers,
> Gordon
> 
> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai  > wrote:
> Are you getting an exception from running the Harness?
> The Harness should already have the required configurations, such as the 
> parent first classloading config.
> 
> Otherwise, if you would like to add your own configuration, use the 
> `withConfiguration` method on the `Harness` class.
> 
> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Also, where do I put flint-conf.yaml in Idea to add additional required 
> config parameter:
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> 
> 
>> On May 21, 2020, at 12:22 PM, Boris Lublinsky > > wrote:
>> 
>> Hi, 
>> I am trying to run 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>  
>> 
>>  locally
>> using 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>  
>> 
>>  
>> And have several questions.
>> 1. It seems fairly straightforward to use it with in memory message 
>> generators, but I can’t figure out how to add Kafka ingress/Egress so that I 
>> can use it with Kafk
>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does 
>> Harness. Is there a way to short circuit it and have Harness get 
>> StatefulFunctionUniverse directly
>> 3. Is there an example on how to write Flink main for stageful function?
>> 4. Is there an example anywhere on how to run such examples in the IDE with 
>> Kafka?
>> 5 There is a great stateful functions example 
>> https://github.com/ververica/flink-statefun-workshop 
>> , but its readme does 
>> not really describe implementation and neither does this article, 
>> referencing it 
>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
>> . Is 
>> there anything that describes this implementation?
>> 
> 



Memory issue in Flink 1.10

2020-05-27 Thread Steven Nelson
We recently migrated to Flink 1.10, but are experiencing some issues with
memory.

Our cluster is:
1) Running inside of Kubernetes
2) Running in HA mode
3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
4) Using RocksDB for checkpointing
5) Running on m5d.4xlarge EC2 instances with 64gb of ram
6) The taskmanager pods do not have a memory limit set on them
7) We set taskmanager.memory.process.size to 48g

We get the following error:
2020-05-27 10:12:34
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:191)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:255)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.initializeStateAndOpen(StreamTask.java:989)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$0(StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:448)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32)
from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
336)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
.createKeyedStateBackend(RocksDBStateBackend.java:548)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool
.java:72)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.reallocPacketBuf(PacketReceiver.java:272)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(
PacketReceiver.java:165)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.receiveNextPacket(PacketReceiver.java:102)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(
RemoteBlockReader2.java:201)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2
.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(
DFSInputStream.java:767)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:
823)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream
.java:883)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
HadoopDataInputStream.java:84)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(
FSDataInputStreamWrapper.java:51)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.flink.core.io.VersionedIOReadableWritable.read(
VersionedIOReadableWritable.java:45)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
KeyedBackendSerializationProxy.java:133)
at org.apache.flink.contrib.streaming.state.restore.
AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation
.java:187)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKVStateMetaData(

Re: Collecting operators real output cardinalities as json files

2020-05-27 Thread Francesco Ventura
Thank you very much for your explanation.
I will keep it in mind.

Best,

Francesco

> Il giorno 27 mag 2020, alle ore 15:43, Piotr Nowojski  
> ha scritto:
> 
> Hi Francesco,
> 
> As long as you do not set update interval of metric reporter to some very low 
> value, there should be no visible performance degradation.
> 
> Maybe worth keeping in mind is that if you jobs are bounded (they are working 
> on bounded input and they finish/complete at some point of time), the last 
> updated metric value before job completes might not necessarily reflect the 
> end state of the job. This limitation may not apply if you will be using REST 
> API, as Job Manager might be remembering the values you are looking for.
> 
> Piotrek
> 
>> On 27 May 2020, at 11:41, Francesco Ventura 
>> > > wrote:
>> 
>> Hi Piotrek,
>> 
>> Thank you for you replay and for your suggestions. Just another doubt.
>> Does the usage of metrics reporter and custom metrics will affect the 
>> performances of the running jobs in term of execution time? Since I need the 
>> information about the exact netRunTime of each job maybe using the REST APIs 
>> to get the other information will be more reliable?
>> 
>> Thank you. Best,
>> 
>> Francesco
>> 
>>> Il giorno 25 mag 2020, alle ore 19:54, Piotr Nowojski >> > ha scritto:
>>> 
>>> Hi Francesco,
>>> 
>>> Have you taken a look at the metrics? [1] And IO metrics [2] in particular? 
>>> You can use some of the pre-existing metric reporter [3] or implement a 
>>> custom one. You could export metrics to some 3rd party system, and get 
>>> JSONs from there, or export them to JSON directly via a custom metric 
>>> reporter.
>>> 
>>> Piotrek
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>>>  
>>> 
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
>>>  
>>> 
>>> [3] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>>>  
>>> 
>>> 
 On 23 May 2020, at 11:31, Francesco Ventura 
 >>> > wrote:
 
 Hi everybody, 
 
 I would like to collect the statistics and the real output cardinalities 
 about the execution of many jobs as json files. I know that exist a REST 
 interface that can be used but I was looking for something simpler. In 
 practice, I would like to get the information showed in the WebUI at 
 runtime about a job and store it as a file. I am using the 
 env.getExecutionPlan() to get the execution plan of a job with the 
 estimated cardinalities for each operator. However, it includes only 
 estimated cardinalities and it can be used only before calling 
 env.execute(). 
 
 There is a similar way to extract the real output cardinalities of each 
 pipeline after the execution? 
 Is there a place where the Flink cluster stores the history of the 
 information about executed jobs?
 Developing a REST client to extract such information is the only way 
 possible? 
 
 I also would like to avoid adding counters to the job source code since I 
 am monitoring the run time execution and I should avoid everything that 
 can interfere.
 
 Maybe is a trivial problem but I have a quick look around and I can not 
 find the solution.
 
 Thank you very much,
 
 Francesco
>>> 
>> 
> 



Re: Apache Flink - Question about application restart

2020-05-27 Thread Till Rohrmann
Hi,

if you submit the same job multiple times, then it will get every time a
different JobID assigned. For Flink, different job submissions are
considered to be different jobs. Once a job has been submitted, it will
keep the same JobID which is important in order to retrieve the checkpoints
associated with this job.

Cheers,
Till

On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:

> Hi Zhu Zhu:
>
> I have another clafication - it looks like if I run the same app multiple
> times - it's job id changes.  So it looks like even though the graph is the
> same the job id is not dependent on the job graph only since with different
> runs of the same app it is not the same.
>
> Please let me know if I've missed anything.
>
> Thanks
>
> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
> wrote:
>
>
> Hi Zhu Zhu:
>
> Just to clarify - from what I understand, EMR also has by default restart
> times (I think it is 3). So if the EMR restarts the job - the job id is the
> same since the job graph is the same.
>
> Thanks for the clarification.
>
> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang 
> wrote:
>
>
> Just share some additional information.
>
> When deploying Flink application on Yarn and it exhausted restart policy,
> then
> the whole application will failed. If you start another instance(Yarn
> application),
> even the high availability is configured, we could not recover from the
> latest
> checkpoint because the clusterId(i.e. applicationId) has changed.
>
>
> Best,
> Yang
>
> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>
> Hi M,
>
> Regarding your questions:
> 1. yes. The id is fixed once the job graph is generated.
> 2. yes
>
> Regarding yarn mode:
> 1. the job id keeps the same because the job graph will be generated once
> at client side and persist in DFS for reuse
> 2. yes if high availability is enabled
>
> Thanks,
> Zhu Zhu
>
> M Singh  于2020年5月23日周六 上午4:06写道:
>
> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


Re: Tumbling windows - increasing checkpoint size over time

2020-05-27 Thread Till Rohrmann
Hi Matt,

could you give us a bit more information about the windows you are using?
They are tumbling windows. What's the size of the windows? Do you allow
lateness of events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant.
Does this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma  wrote:

> Hi, Matt
> The total size of the state of the window operator is related to the
> number of windows. For example if you use keyby+tumblingwindow there
> would be keys number of windows.
> Hope this helps.
> Best,
> Guowei
>
> Wissman, Matt  于2020年5月27日周三 上午3:35写道:
> >
> > Hello Flink Community,
> >
> >
> >
> > I’m running a Flink pipeline that uses a tumbling window and incremental
> checkpoint with RocksDB backed by s3. The number of objects in the window
> is stable but overtime the checkpoint size grows seemingly unbounded.
> Within the first few hours after bringing the Flink pipeline up, the
> checkpoint size is around 100K but after a week of operation it grows to
> around 100MB. The pipeline isn’t using any other Flink state besides the
> state that the window uses. I think this has something to do with RocksDB’s
> compaction but shouldn’t the tumbling window state expire and be purged
> from the checkpoint?
> >
> >
> >
> > Flink Version 1.7.1
> >
> >
> >
> > Thanks!
> >
> >
> >
> > -Matt
>


Re: multiple sources

2020-05-27 Thread Till Rohrmann
Hi Aissa,

Flink supports to read from multiple sources in one job. You have to call
multiple times `StreamExecutionEnvironment.addSource()` with the respective
`SourceFunction`. Flink does not come with a ready-made MongoDB connector.
However, there is a project which tried to implement a MongoDB connector
[1]. You might be able to take this work and convert it into a
`SourceFunction`.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#access-mongodb

Cheers,
Till

On Wed, May 27, 2020 at 11:17 AM Aissa Elaffani 
wrote:

> Hello everyone,
> I hope you all doing well.I am reading from a Kafka topic some real-time
> messages produced by some sensors, and in order to do some aggregations, I
> need to enrich the stream with other data that are stocked in a mongoDB.
> So, I want to know if it is possible to work with two sources in one job?
> if Yes, How to do so ?
> Best,
> Aissa
>


Re: Collecting operators real output cardinalities as json files

2020-05-27 Thread Piotr Nowojski
Hi Francesco,

As long as you do not set update interval of metric reporter to some very low 
value, there should be no visible performance degradation.

Maybe worth keeping in mind is that if you jobs are bounded (they are working 
on bounded input and they finish/complete at some point of time), the last 
updated metric value before job completes might not necessarily reflect the end 
state of the job. This limitation may not apply if you will be using REST API, 
as Job Manager might be remembering the values you are looking for.

Piotrek

> On 27 May 2020, at 11:41, Francesco Ventura 
>  wrote:
> 
> Hi Piotrek,
> 
> Thank you for you replay and for your suggestions. Just another doubt.
> Does the usage of metrics reporter and custom metrics will affect the 
> performances of the running jobs in term of execution time? Since I need the 
> information about the exact netRunTime of each job maybe using the REST APIs 
> to get the other information will be more reliable?
> 
> Thank you. Best,
> 
> Francesco
> 
>> Il giorno 25 mag 2020, alle ore 19:54, Piotr Nowojski > > ha scritto:
>> 
>> Hi Francesco,
>> 
>> Have you taken a look at the metrics? [1] And IO metrics [2] in particular? 
>> You can use some of the pre-existing metric reporter [3] or implement a 
>> custom one. You could export metrics to some 3rd party system, and get JSONs 
>> from there, or export them to JSON directly via a custom metric reporter.
>> 
>> Piotrek
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
>>  
>> 
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>>  
>> 
>> 
>>> On 23 May 2020, at 11:31, Francesco Ventura 
>>> >> > wrote:
>>> 
>>> Hi everybody, 
>>> 
>>> I would like to collect the statistics and the real output cardinalities 
>>> about the execution of many jobs as json files. I know that exist a REST 
>>> interface that can be used but I was looking for something simpler. In 
>>> practice, I would like to get the information showed in the WebUI at 
>>> runtime about a job and store it as a file. I am using the 
>>> env.getExecutionPlan() to get the execution plan of a job with the 
>>> estimated cardinalities for each operator. However, it includes only 
>>> estimated cardinalities and it can be used only before calling 
>>> env.execute(). 
>>> 
>>> There is a similar way to extract the real output cardinalities of each 
>>> pipeline after the execution? 
>>> Is there a place where the Flink cluster stores the history of the 
>>> information about executed jobs?
>>> Developing a REST client to extract such information is the only way 
>>> possible? 
>>> 
>>> I also would like to avoid adding counters to the job source code since I 
>>> am monitoring the run time execution and I should avoid everything that can 
>>> interfere.
>>> 
>>> Maybe is a trivial problem but I have a quick look around and I can not 
>>> find the solution.
>>> 
>>> Thank you very much,
>>> 
>>> Francesco
>> 
> 



Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-27 Thread Piotr Nowojski
Hi Weihua,

Good to hear that you have found the problem. Let us know if you find some 
other problems after all.

Piotrek

> On 27 May 2020, at 14:18, Weihua Hu  wrote:
> 
> Hi Piotrek,
> 
> Thanks for your suggestions, I found some network issues which seems to be 
> the cause of back pressure.
> 
> Best
> Weihua Hu
> 
>> 2020年5月26日 02:54,Piotr Nowojski > > 写道:
>> 
>> Hi Weihua,
>> 
>> > After dumping the memory and analyzing it, I found:
>> > Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
>> > Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
>> > This is not consistent with my understanding of the Flink network 
>> > transmission mechanism.
>> 
>> It probably is consistent. Downstream receiver unannounced all of the 
>> credits, and it’s simply waiting for the data to arrive, while upstream 
>> sender is waiting for the data to be sent down the stream.
>> 
>> Stack trace you posted confirms that the sink you posted has empty input 
>> buffer - it’s waiting for input data. Assuming rescale partitoning works as 
>> expected and indeed node 242 is connected to node 121, it implies the 
>> bottleneck is your data exchange between those two tasks. It could be
>> 
>> - network bottleneck (slow network? Packet losses?)
>> - machine swapping/long GC pauses (If upstream node is experiencing long 
>> pauses it might show up like this)
>> - cpu bottleneck in the network stack (frequent flushing? SSL?)
>> - some resource competition (too high parallelism for given number of 
>> machines)
>> - netty threads are not keeping up
>> 
>> It’s hard to say what’s the problem without looking at the resource usage 
>> (CPU/Network/Memory/Disk IO), GC logs, code profiling results.
>> 
>> Piotrek
>> 
>> PS Zhijiang:
>> 
>> RescalePartitioner in this case should be connect just two upstream subtasks 
>> with one downstream sink. Upstream subtasks N and N+1 should be connected to 
>> sink with N/2 id.
>> 
>>> On 25 May 2020, at 04:39, Weihua Hu >> > wrote:
>>> 
>>> Hi, Zhijiang
>>> 
>>> I understand the normal credit-based backpressure mechanism. as usual the 
>>> Sink inPoolUsage will be full, and the task stack will also have some 
>>> information. 
>>> but this time is not the same. The Sink inPoolUsage is 0. 
>>> I also checked the stack. The Map is waiting 
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment
>>> The Sink is waiting data to deal, this is not very in line with 
>>> expectations.
>>> 
>>> 
>>> <粘贴的图形-2.tiff>
>>> 
>>> <粘贴的图形-1.tiff>
>>> 
>>> 
>>> 
>>> Best
>>> Weihua Hu
>>> 
 2020年5月24日 21:57,Zhijiang >>> > 写道:
 
 Hi Weihua,
 
 From your below info, it is with the expectation in credit-based flow 
 control. 
 
 I guess one of the sink parallelism causes the backpressure, so you will 
 see that there are no available credits on Sink side and
 the outPoolUsage of Map is almost 100%. It really reflects the 
 credit-based states in the case of backpressure.
 
 If you want to analyze the root cause of backpressure, you can trace the 
 task stack of respective Sink parallelism to find which operation costs 
 much,
 then you can increase the parallelism or improve the UDF(if have 
 bottleneck) to have a try. In addition, i am not sure why you choose 
 rescale to shuffle data among operators. The default
 forward mode can gain really good performance by default if you adjusting 
 the same parallelism among them.
 
 Best,
 Zhijiang
 --
 From:Weihua Hu mailto:huweihua@gmail.com>>
 Send Time:2020年5月24日(星期日) 18:32
 To:user mailto:user@flink.apache.org>>
 Subject:Singal task backpressure problem with Credit-based Flow Control
 
 Hi, all
 
 I ran into a weird single Task BackPressure problem.
 
 JobInfo:
 DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via 
 rescale. 
 Flink version: 1.9.0
 
 There is no related info in jobmanager/taskamanger log.
 
 Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
 downstream Sink (121)' s inPoolUsage is 0.
 
 After dumping the memory and analyzing it, I found:
 Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
 Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
 This is not consistent with my understanding of the Flink network 
 transmission mechanism.
 
 Can someone help me? Thanks a lot.
 
 
 Best
 Weihua Hu
 
 
>>> 
>> 
> 



Re: Flink sql 跨库

2020-05-27 Thread Leonard Xu

问题解决了就好,

> 找到原因了,flink 把year 当成关键字了

YEAR在时间相关类型中会使用,在 FLINK SQL 是关键字的,在一些DB里也是关键字,使用时需要转义,类似的还有DAY, MONTH等[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/types.html#interval-year-to-month
 


祝好
Leonard Xu

> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2020-05-27 19:09:43, "Zhou Zach"  wrote:
>> The program finished with the following exception:
>> 
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>>   "ARRAY" ...
>>   "CASE" ...
>>   "CURRENT" ...
>>   "CURRENT_CATALOG" ...
>>   "CURRENT_DATE" ...
>>   "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>   "CURRENT_PATH" ...
>>   "CURRENT_ROLE" ...
>>   "CURRENT_SCHEMA" ...
>>   "CURRENT_TIME" ...
>>   "CURRENT_TIMESTAMP" ...
>>   "CURRENT_USER" ...
>>   "DATE" ...
>>   "EXISTS" ...
>>   "FALSE" ...
>>   "INTERVAL" ...
>>   "LOCALTIME" ...
>>   "LOCALTIMESTAMP" ...
>>   "MULTISET" ...
>>   "NEW" ...
>>   "NEXT" ...
>>   "NOT" ...
>>   "NULL" ...
>>   "PERIOD" ...
>>   "SESSION_USER" ...
>>   "SYSTEM_USER" ...
>>   "TIME" ...
>>   "TIMESTAMP" ...
>>   "TRUE" ...
>>   "UNKNOWN" ...
>>   "USER" ...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>   "?" ...
>>   "+" ...
>>   "-" ...
>>...
>>...
>>...
>>...
>>...
>>   "CAST" ...
>>   "EXTRACT" ...
>>   "POSITION" ...
>>   "CONVERT" ...
>>   "TRANSLATE" ...
>>   "OVERLAY" ...
>>   "FLOOR" ...
>>   "CEIL" ...
>>   "CEILING" ...
>>   "SUBSTRING" ...
>>   "TRIM" ...
>>   "CLASSIFIER" ...
>>   "MATCH_NUMBER" ...
>>   "RUNNING" ...
>>   "PREV" ...
>>   "JSON_EXISTS" ...
>>   "JSON_VALUE" ...
>>   "JSON_QUERY" ...
>>   "JSON_OBJECT" ...
>>   "JSON_OBJECTAGG" ...
>>   "JSON_ARRAY" ...
>>   "JSON_ARRAYAGG" ...
>>   "MAP" ...
>>   "SPECIFIC" ...
>>   "ABS" ...
>>   "AVG" ...
>>   "CARDINALITY" ...
>>   "CHAR_LENGTH" ...
>>   "CHARACTER_LENGTH" ...
>>   "COALESCE" ...
>>   "COLLECT" ...
>>   "COVAR_POP" ...
>>   "COVAR_SAMP" ...
>>   "CUME_DIST" ...
>>   "COUNT" ...
>>   "DENSE_RANK" ...
>>   "ELEMENT" ...
>>   "EXP" ...
>>   "FIRST_VALUE" ...
>>   "FUSION" ...
>>   "GROUPING" ...
>>   "HOUR" ...
>>   "LAG" ...
>>   "LEAD" ...
>>   "LEFT" ...
>>   "LAST_VALUE" ...
>>   "LN" ...
>>   "LOWER" ...
>>   "MAX" ...
>>   "MIN" ...
>>   "MINUTE" ...
>>   "MOD" ...
>>   "MONTH" ...
>>   "NTH_VALUE" ...
>>   "NTILE" ...
>>   "NULLIF" ...
>>   "OCTET_LENGTH" ...
>>   "PERCENT_RANK" ...
>>   "POWER" ...
>>   "RANK" ...
>>   "REGR_COUNT" ...
>>   "REGR_SXX" ...
>>   "REGR_SYY" ...
>>   "RIGHT" ...
>>   "ROW_NUMBER" ...
>>   "SECOND" ...
>>   "SQRT" ...
>>   "STDDEV_POP" ...
>>   "STDDEV_SAMP" ...
>>   "SUM" ...
>>   "UPPER" ...
>>   "TRUNCATE" ...
>>   "VAR_POP" ...
>>   "VAR_SAMP" ...
>>   "YEAR" ...
>>   "YEAR" "(" ...
>> 
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>> Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>>   "ARRAY" ...
>>   "CASE" ...
>>   "CURRENT" ...
>>   "CURRENT_CATALOG" ...
>>   "CURRENT_DATE" ...
>>   "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>   "CURRENT_PATH" ...
>>   "CURRENT_ROLE" ...
>>   "CURRENT_SCHEMA" ...
>>   "CURRENT_TIME" ...
>>   "CURRENT_TIMESTAMP" ...
>>   "CURRENT_USER" ...
>>   "DATE" ...
>>   "EXISTS" ...
>>   "FALSE" ...
>>   "INTERVAL" ...
>>   "LOCALTIME" ...
>>   "LOCALTIMESTAMP" ...
>>   "MULTISET" ...
>>   "NEW" ...
>>   "NEXT" ...
>>   "NOT" ...
>>   "NULL" ...
>>   "PERIOD" ...
>>   "SESSION_USER" ...
>>   "SYSTEM_USER" ...
>>   "TIME" ...
>>   "TIMESTAMP" ...
>>   "TRUE" ...
>>   "UNKNOWN" ...
>>   "USER" ...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>   "?" ...
>>   "+" ...
>>   "-" ...
>>...
>>...
>>

Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-27 Thread Weihua Hu
Hi Piotrek,

Thanks for your suggestions, I found some network issues which seems to be the 
cause of back pressure.

Best
Weihua Hu

> 2020年5月26日 02:54,Piotr Nowojski  写道:
> 
> Hi Weihua,
> 
> > After dumping the memory and analyzing it, I found:
> > Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
> > Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
> > This is not consistent with my understanding of the Flink network 
> > transmission mechanism.
> 
> It probably is consistent. Downstream receiver unannounced all of the 
> credits, and it’s simply waiting for the data to arrive, while upstream 
> sender is waiting for the data to be sent down the stream.
> 
> Stack trace you posted confirms that the sink you posted has empty input 
> buffer - it’s waiting for input data. Assuming rescale partitoning works as 
> expected and indeed node 242 is connected to node 121, it implies the 
> bottleneck is your data exchange between those two tasks. It could be
> 
> - network bottleneck (slow network? Packet losses?)
> - machine swapping/long GC pauses (If upstream node is experiencing long 
> pauses it might show up like this)
> - cpu bottleneck in the network stack (frequent flushing? SSL?)
> - some resource competition (too high parallelism for given number of 
> machines)
> - netty threads are not keeping up
> 
> It’s hard to say what’s the problem without looking at the resource usage 
> (CPU/Network/Memory/Disk IO), GC logs, code profiling results.
> 
> Piotrek
> 
> PS Zhijiang:
> 
> RescalePartitioner in this case should be connect just two upstream subtasks 
> with one downstream sink. Upstream subtasks N and N+1 should be connected to 
> sink with N/2 id.
> 
>> On 25 May 2020, at 04:39, Weihua Hu > > wrote:
>> 
>> Hi, Zhijiang
>> 
>> I understand the normal credit-based backpressure mechanism. as usual the 
>> Sink inPoolUsage will be full, and the task stack will also have some 
>> information. 
>> but this time is not the same. The Sink inPoolUsage is 0. 
>> I also checked the stack. The Map is waiting 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment
>> The Sink is waiting data to deal, this is not very in line with expectations.
>> 
>> 
>> <粘贴的图形-2.tiff>
>> 
>> <粘贴的图形-1.tiff>
>> 
>> 
>> 
>> Best
>> Weihua Hu
>> 
>>> 2020年5月24日 21:57,Zhijiang >> > 写道:
>>> 
>>> Hi Weihua,
>>> 
>>> From your below info, it is with the expectation in credit-based flow 
>>> control. 
>>> 
>>> I guess one of the sink parallelism causes the backpressure, so you will 
>>> see that there are no available credits on Sink side and
>>> the outPoolUsage of Map is almost 100%. It really reflects the credit-based 
>>> states in the case of backpressure.
>>> 
>>> If you want to analyze the root cause of backpressure, you can trace the 
>>> task stack of respective Sink parallelism to find which operation costs 
>>> much,
>>> then you can increase the parallelism or improve the UDF(if have 
>>> bottleneck) to have a try. In addition, i am not sure why you choose 
>>> rescale to shuffle data among operators. The default
>>> forward mode can gain really good performance by default if you adjusting 
>>> the same parallelism among them.
>>> 
>>> Best,
>>> Zhijiang
>>> --
>>> From:Weihua Hu mailto:huweihua@gmail.com>>
>>> Send Time:2020年5月24日(星期日) 18:32
>>> To:user mailto:user@flink.apache.org>>
>>> Subject:Singal task backpressure problem with Credit-based Flow Control
>>> 
>>> Hi, all
>>> 
>>> I ran into a weird single Task BackPressure problem.
>>> 
>>> JobInfo:
>>> DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via 
>>> rescale. 
>>> Flink version: 1.9.0
>>> 
>>> There is no related info in jobmanager/taskamanger log.
>>> 
>>> Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
>>> downstream Sink (121)' s inPoolUsage is 0.
>>> 
>>> After dumping the memory and analyzing it, I found:
>>> Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
>>> Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
>>> This is not consistent with my understanding of the Flink network 
>>> transmission mechanism.
>>> 
>>> Can someone help me? Thanks a lot.
>>> 
>>> 
>>> Best
>>> Weihua Hu
>>> 
>>> 
>> 
> 



Re: In consistent Check point API response

2020-05-27 Thread Vijay Bhaskar
Created JIRA for it: https://issues.apache.org/jira/browse/FLINK-17966

Regards
Bhaskar



On Wed, May 27, 2020 at 1:28 PM Vijay Bhaskar 
wrote:

> Thanks Yun. In that case  it would be good to give the reference of that
> documentation in the Flink Rest API:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
> while explaining about the checkpoints. Tomorrow any one want to use REST
> API, they will get easy reference of the monitoring document of
> checkpoints. It would give them complete idea. So I will open Jira with
> this requirement
>
> Regards
> Bhaskar
>
> On Wed, May 27, 2020 at 11:59 AM Yun Tang  wrote:
>
>> To be honest, from my point of view current description should have
>> already give enough explanations [1] in "Overview Tab".
>> *Latest Completed Checkpoint*: The latest successfully completed
>> checkpoints.
>> *Latest Restore*: There are two types of restore operations.
>>
>>- Restore from Checkpoint: We restored from a regular periodic
>>checkpoint.
>>- Restore from Savepoint: We restored from a savepoint.
>>
>>
>> You could still create a JIRA issue and give your ideas in that issue. If
>> agreed to work on in that ticket, you can create a PR to edit
>> checkpoint_monitoring.md [2] and checkpoint_monitoring.zh.md [3] to
>> update related documentation.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/checkpoint_monitoring.html#overview-tab
>> [2]
>> https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.md
>> [3]
>> https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.zh.md
>>
>> Best
>> Yun Tang
>> --
>> *From:* Vijay Bhaskar 
>> *Sent:* Tuesday, May 26, 2020 15:18
>> *To:* Yun Tang 
>> *Cc:* user 
>> *Subject:* Re: In consistent Check point API response
>>
>> Thanks Yun. How can i contribute better documentation of the same by
>> opening Jira on this?
>>
>> Regards
>> Bhaskar
>>
>> On Tue, May 26, 2020 at 12:32 PM Yun Tang  wrote:
>>
>> Hi Bhaskar
>>
>> I think I have understood your scenario now. And I think this is what
>> expected in Flink.
>> As you only allow your job could restore 5 times, the "restore" would
>> only record the checkpoint to restore at the 5th recovery, and the
>> checkpoint id would always stay there.
>>
>> "Restored" is for last restored checkpoint and "completed" is for last
>> completed checkpoint, they are actually not the same thing.
>> The only scenario that they're the same in numbers is when Flink just
>> restore successfully before a new checkpoint completes.
>>
>> Best
>> Yun Tang
>>
>>
>> --
>> *From:* Vijay Bhaskar 
>> *Sent:* Tuesday, May 26, 2020 12:19
>> *To:* Yun Tang 
>> *Cc:* user 
>> *Subject:* Re: In consistent Check point API response
>>
>> Hi Yun
>> Understood the issue now:
>> "restored" always shows only the check point that is used for restoring
>> previous state
>> In all the attempts < 6 ( in my case max attempts are 5, 6 is the last
>> attempt)
>>   Flink HA is  restoring the state, so restored and latest are same value
>> if the last attempt  == 6
>>  Flink job already has few check points
>>  After that job failed and Flink HA gave up and marked the job state as
>> "FAILED"
>>At this point "restored". value is the one which is in 5'th attempt
>> but latest is the one which is the latest checkpoint which is retained
>>
>> Shall i file any documentation improvement Jira? I want to add more
>> documentation with the help of  the above scenarios.
>>
>> Regards
>> Bhaskar
>>
>>
>>
>> On Tue, May 26, 2020 at 8:14 AM Yun Tang  wrote:
>>
>> Hi Bhaskar
>>
>> It seems I still not understand your case-5 totally. Your job failed 6
>> times, and recover from previous checkpoint to restart again. However, you
>> found the REST API told the wrong answer.
>> How do you ensure your "restored" field is giving the wrong checkpoint
>> file which is not latest? Have you ever checked the log in JM to view
>> related contents: "Restoring job xxx from latest valid checkpoint: x@"
>> [1] to know exactly which checkpoint choose to restore?
>>
>> I think you could give a more concrete example e.g. which expected/actual
>> checkpoint to restore, to tell your story.
>>
>> [1]
>> https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250
>>
>> Best
>> Yun Tang
>> --
>> *From:* Vijay Bhaskar 
>> *Sent:* Monday, May 25, 2020 17:01
>> *To:* Yun Tang 
>> *Cc:* user 
>> *Subject:* Re: In consistent Check point API response
>>
>> Thanks Yun.
>> Here is the problem i am facing:
>>
>> I am using  jobs/:jobID/checkpoints  API to recover the failed job. We
>> have the remote manager which monitors the jobs.  We are using "restored"
>> field of the API response to get the latest check point file to use. Its
>> giving correct 

Re:Re: Re: Re: Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach
好的,感谢指点

















在 2020-05-27 19:33:42,"Rui Li"  写道:
>你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。
>
>On Wed, May 27, 2020 at 7:27 PM Zhou Zach  wrote:
>
>> 是的,发现了,感谢指点。请教下,用intellij
>> idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij
>> idea连接远程,如果集群在阿里云上,是不是要另外开端口的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-05-27 19:19:58,"Rui Li"  写道:
>> >year在calcite里是保留关键字,你用`year`试试呢
>> >
>> >On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
>> >
>> >> The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>> >> method caused an error: SQL parse failed. Encountered "year =" at line
>> 4,
>> >> column 51.
>> >> Was expecting one of:
>> >> "ARRAY" ...
>> >> "CASE" ...
>> >> "CURRENT" ...
>> >> "CURRENT_CATALOG" ...
>> >> "CURRENT_DATE" ...
>> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> >> "CURRENT_PATH" ...
>> >> "CURRENT_ROLE" ...
>> >> "CURRENT_SCHEMA" ...
>> >> "CURRENT_TIME" ...
>> >> "CURRENT_TIMESTAMP" ...
>> >> "CURRENT_USER" ...
>> >> "DATE" ...
>> >> "EXISTS" ...
>> >> "FALSE" ...
>> >> "INTERVAL" ...
>> >> "LOCALTIME" ...
>> >> "LOCALTIMESTAMP" ...
>> >> "MULTISET" ...
>> >> "NEW" ...
>> >> "NEXT" ...
>> >> "NOT" ...
>> >> "NULL" ...
>> >> "PERIOD" ...
>> >> "SESSION_USER" ...
>> >> "SYSTEM_USER" ...
>> >> "TIME" ...
>> >> "TIMESTAMP" ...
>> >> "TRUE" ...
>> >> "UNKNOWN" ...
>> >> "USER" ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >> "?" ...
>> >> "+" ...
>> >> "-" ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >> "CAST" ...
>> >> "EXTRACT" ...
>> >> "POSITION" ...
>> >> "CONVERT" ...
>> >> "TRANSLATE" ...
>> >> "OVERLAY" ...
>> >> "FLOOR" ...
>> >> "CEIL" ...
>> >> "CEILING" ...
>> >> "SUBSTRING" ...
>> >> "TRIM" ...
>> >> "CLASSIFIER" ...
>> >> "MATCH_NUMBER" ...
>> >> "RUNNING" ...
>> >> "PREV" ...
>> >> "JSON_EXISTS" ...
>> >> "JSON_VALUE" ...
>> >> "JSON_QUERY" ...
>> >> "JSON_OBJECT" ...
>> >> "JSON_OBJECTAGG" ...
>> >> "JSON_ARRAY" ...
>> >> "JSON_ARRAYAGG" ...
>> >> "MAP" ...
>> >> "SPECIFIC" ...
>> >> "ABS" ...
>> >> "AVG" ...
>> >> "CARDINALITY" ...
>> >> "CHAR_LENGTH" ...
>> >> "CHARACTER_LENGTH" ...
>> >> "COALESCE" ...
>> >> "COLLECT" ...
>> >> "COVAR_POP" ...
>> >> "COVAR_SAMP" ...
>> >> "CUME_DIST" ...
>> >> "COUNT" ...
>> >> "DENSE_RANK" ...
>> >> "ELEMENT" ...
>> >> "EXP" ...
>> >> "FIRST_VALUE" ...
>> >> "FUSION" ...
>> >> "GROUPING" ...
>> >> "HOUR" ...
>> >> "LAG" ...
>> >> "LEAD" ...
>> >> "LEFT" ...
>> >> "LAST_VALUE" ...
>> >> "LN" ...
>> >> "LOWER" ...
>> >> "MAX" ...
>> >> "MIN" ...
>> >> "MINUTE" ...
>> >> "MOD" ...
>> >> "MONTH" ...
>> >> "NTH_VALUE" ...
>> >> "NTILE" ...
>> >> "NULLIF" ...
>> >> "OCTET_LENGTH" ...
>> >> "PERCENT_RANK" ...
>> >> "POWER" ...
>> >> "RANK" ...
>> >> "REGR_COUNT" ...
>> >> "REGR_SXX" ...
>> >> "REGR_SYY" ...
>> >> "RIGHT" ...
>> >> "ROW_NUMBER" ...
>> >> "SECOND" ...
>> >> "SQRT" ...
>> >> "STDDEV_POP" ...
>> >> "STDDEV_SAMP" ...
>> >> "SUM" ...
>> >> "UPPER" ...
>> >> "TRUNCATE" ...
>> >> "VAR_POP" ...
>> >> "VAR_SAMP" ...
>> >> "YEAR" ...
>> >> "YEAR" "(" ...
>> >>
>> >>
>> >> at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> >> at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> >> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:422)
>> >> at
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> >> at
>> >>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> >> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> >> failed. 

Re: Re: Re: Re: Flink sql 跨库

2020-05-27 Thread Rui Li
你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。

On Wed, May 27, 2020 at 7:27 PM Zhou Zach  wrote:

> 是的,发现了,感谢指点。请教下,用intellij
> idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij
> idea连接远程,如果集群在阿里云上,是不是要另外开端口的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-27 19:19:58,"Rui Li"  写道:
> >year在calcite里是保留关键字,你用`year`试试呢
> >
> >On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
> >
> >> The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: SQL parse failed. Encountered "year =" at line
> 4,
> >> column 51.
> >> Was expecting one of:
> >> "ARRAY" ...
> >> "CASE" ...
> >> "CURRENT" ...
> >> "CURRENT_CATALOG" ...
> >> "CURRENT_DATE" ...
> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> >> "CURRENT_PATH" ...
> >> "CURRENT_ROLE" ...
> >> "CURRENT_SCHEMA" ...
> >> "CURRENT_TIME" ...
> >> "CURRENT_TIMESTAMP" ...
> >> "CURRENT_USER" ...
> >> "DATE" ...
> >> "EXISTS" ...
> >> "FALSE" ...
> >> "INTERVAL" ...
> >> "LOCALTIME" ...
> >> "LOCALTIMESTAMP" ...
> >> "MULTISET" ...
> >> "NEW" ...
> >> "NEXT" ...
> >> "NOT" ...
> >> "NULL" ...
> >> "PERIOD" ...
> >> "SESSION_USER" ...
> >> "SYSTEM_USER" ...
> >> "TIME" ...
> >> "TIMESTAMP" ...
> >> "TRUE" ...
> >> "UNKNOWN" ...
> >> "USER" ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >> "?" ...
> >> "+" ...
> >> "-" ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >> "CAST" ...
> >> "EXTRACT" ...
> >> "POSITION" ...
> >> "CONVERT" ...
> >> "TRANSLATE" ...
> >> "OVERLAY" ...
> >> "FLOOR" ...
> >> "CEIL" ...
> >> "CEILING" ...
> >> "SUBSTRING" ...
> >> "TRIM" ...
> >> "CLASSIFIER" ...
> >> "MATCH_NUMBER" ...
> >> "RUNNING" ...
> >> "PREV" ...
> >> "JSON_EXISTS" ...
> >> "JSON_VALUE" ...
> >> "JSON_QUERY" ...
> >> "JSON_OBJECT" ...
> >> "JSON_OBJECTAGG" ...
> >> "JSON_ARRAY" ...
> >> "JSON_ARRAYAGG" ...
> >> "MAP" ...
> >> "SPECIFIC" ...
> >> "ABS" ...
> >> "AVG" ...
> >> "CARDINALITY" ...
> >> "CHAR_LENGTH" ...
> >> "CHARACTER_LENGTH" ...
> >> "COALESCE" ...
> >> "COLLECT" ...
> >> "COVAR_POP" ...
> >> "COVAR_SAMP" ...
> >> "CUME_DIST" ...
> >> "COUNT" ...
> >> "DENSE_RANK" ...
> >> "ELEMENT" ...
> >> "EXP" ...
> >> "FIRST_VALUE" ...
> >> "FUSION" ...
> >> "GROUPING" ...
> >> "HOUR" ...
> >> "LAG" ...
> >> "LEAD" ...
> >> "LEFT" ...
> >> "LAST_VALUE" ...
> >> "LN" ...
> >> "LOWER" ...
> >> "MAX" ...
> >> "MIN" ...
> >> "MINUTE" ...
> >> "MOD" ...
> >> "MONTH" ...
> >> "NTH_VALUE" ...
> >> "NTILE" ...
> >> "NULLIF" ...
> >> "OCTET_LENGTH" ...
> >> "PERCENT_RANK" ...
> >> "POWER" ...
> >> "RANK" ...
> >> "REGR_COUNT" ...
> >> "REGR_SXX" ...
> >> "REGR_SYY" ...
> >> "RIGHT" ...
> >> "ROW_NUMBER" ...
> >> "SECOND" ...
> >> "SQRT" ...
> >> "STDDEV_POP" ...
> >> "STDDEV_SAMP" ...
> >> "SUM" ...
> >> "UPPER" ...
> >> "TRUNCATE" ...
> >> "VAR_POP" ...
> >> "VAR_SAMP" ...
> >> "YEAR" ...
> >> "YEAR" "(" ...
> >>
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >> at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> >> failed. Encountered "year =" at line 4, column 51.
> >> Was expecting one of:
> >> "ARRAY" ...
> >> "CASE" ...
> >> "CURRENT" ...
> >> "CURRENT_CATALOG" ...
> >> "CURRENT_DATE" ...
> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> >> 

Re:Re: Re: Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach
是的,发现了,感谢指点。请教下,用intellij 
idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij 
idea连接远程,如果集群在阿里云上,是不是要另外开端口的

















在 2020-05-27 19:19:58,"Rui Li"  写道:
>year在calcite里是保留关键字,你用`year`试试呢
>
>On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
>
>> The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: SQL parse failed. Encountered "year =" at line 4,
>> column 51.
>> Was expecting one of:
>> "ARRAY" ...
>> "CASE" ...
>> "CURRENT" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DATE" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "CURRENT_USER" ...
>> "DATE" ...
>> "EXISTS" ...
>> "FALSE" ...
>> "INTERVAL" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "MULTISET" ...
>> "NEW" ...
>> "NEXT" ...
>> "NOT" ...
>> "NULL" ...
>> "PERIOD" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "TIME" ...
>> "TIMESTAMP" ...
>> "TRUE" ...
>> "UNKNOWN" ...
>> "USER" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "?" ...
>> "+" ...
>> "-" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "CAST" ...
>> "EXTRACT" ...
>> "POSITION" ...
>> "CONVERT" ...
>> "TRANSLATE" ...
>> "OVERLAY" ...
>> "FLOOR" ...
>> "CEIL" ...
>> "CEILING" ...
>> "SUBSTRING" ...
>> "TRIM" ...
>> "CLASSIFIER" ...
>> "MATCH_NUMBER" ...
>> "RUNNING" ...
>> "PREV" ...
>> "JSON_EXISTS" ...
>> "JSON_VALUE" ...
>> "JSON_QUERY" ...
>> "JSON_OBJECT" ...
>> "JSON_OBJECTAGG" ...
>> "JSON_ARRAY" ...
>> "JSON_ARRAYAGG" ...
>> "MAP" ...
>> "SPECIFIC" ...
>> "ABS" ...
>> "AVG" ...
>> "CARDINALITY" ...
>> "CHAR_LENGTH" ...
>> "CHARACTER_LENGTH" ...
>> "COALESCE" ...
>> "COLLECT" ...
>> "COVAR_POP" ...
>> "COVAR_SAMP" ...
>> "CUME_DIST" ...
>> "COUNT" ...
>> "DENSE_RANK" ...
>> "ELEMENT" ...
>> "EXP" ...
>> "FIRST_VALUE" ...
>> "FUSION" ...
>> "GROUPING" ...
>> "HOUR" ...
>> "LAG" ...
>> "LEAD" ...
>> "LEFT" ...
>> "LAST_VALUE" ...
>> "LN" ...
>> "LOWER" ...
>> "MAX" ...
>> "MIN" ...
>> "MINUTE" ...
>> "MOD" ...
>> "MONTH" ...
>> "NTH_VALUE" ...
>> "NTILE" ...
>> "NULLIF" ...
>> "OCTET_LENGTH" ...
>> "PERCENT_RANK" ...
>> "POWER" ...
>> "RANK" ...
>> "REGR_COUNT" ...
>> "REGR_SXX" ...
>> "REGR_SYY" ...
>> "RIGHT" ...
>> "ROW_NUMBER" ...
>> "SECOND" ...
>> "SQRT" ...
>> "STDDEV_POP" ...
>> "STDDEV_SAMP" ...
>> "SUM" ...
>> "UPPER" ...
>> "TRUNCATE" ...
>> "VAR_POP" ...
>> "VAR_SAMP" ...
>> "YEAR" ...
>> "YEAR" "(" ...
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>> "ARRAY" ...
>> "CASE" ...
>> "CURRENT" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DATE" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "CURRENT_USER" ...
>> "DATE" ...
>> "EXISTS" ...
>> "FALSE" ...
>> "INTERVAL" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "MULTISET" ...
>> "NEW" ...
>> "NEXT" ...
>> "NOT" ...
>> "NULL" ...
>> "PERIOD" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "TIME" ...
>> "TIMESTAMP" ...
>> "TRUE" ...
>> "UNKNOWN" ...
>> "USER" ...
>>  ...
>>  ...
>>  

Re:Re:Re: Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach
找到原因了,flink 把year 当成关键字了

















At 2020-05-27 19:09:43, "Zhou Zach"  wrote:
>The program finished with the following exception:
>
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
>Was expecting one of:
>"ARRAY" ...
>"CASE" ...
>"CURRENT" ...
>"CURRENT_CATALOG" ...
>"CURRENT_DATE" ...
>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>"CURRENT_PATH" ...
>"CURRENT_ROLE" ...
>"CURRENT_SCHEMA" ...
>"CURRENT_TIME" ...
>"CURRENT_TIMESTAMP" ...
>"CURRENT_USER" ...
>"DATE" ...
>"EXISTS" ...
>"FALSE" ...
>"INTERVAL" ...
>"LOCALTIME" ...
>"LOCALTIMESTAMP" ...
>"MULTISET" ...
>"NEW" ...
>"NEXT" ...
>"NOT" ...
>"NULL" ...
>"PERIOD" ...
>"SESSION_USER" ...
>"SYSTEM_USER" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
>"USER" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"?" ...
>"+" ...
>"-" ...
> ...
> ...
> ...
> ...
> ...
>"CAST" ...
>"EXTRACT" ...
>"POSITION" ...
>"CONVERT" ...
>"TRANSLATE" ...
>"OVERLAY" ...
>"FLOOR" ...
>"CEIL" ...
>"CEILING" ...
>"SUBSTRING" ...
>"TRIM" ...
>"CLASSIFIER" ...
>"MATCH_NUMBER" ...
>"RUNNING" ...
>"PREV" ...
>"JSON_EXISTS" ...
>"JSON_VALUE" ...
>"JSON_QUERY" ...
>"JSON_OBJECT" ...
>"JSON_OBJECTAGG" ...
>"JSON_ARRAY" ...
>"JSON_ARRAYAGG" ...
>"MAP" ...
>"SPECIFIC" ...
>"ABS" ...
>"AVG" ...
>"CARDINALITY" ...
>"CHAR_LENGTH" ...
>"CHARACTER_LENGTH" ...
>"COALESCE" ...
>"COLLECT" ...
>"COVAR_POP" ...
>"COVAR_SAMP" ...
>"CUME_DIST" ...
>"COUNT" ...
>"DENSE_RANK" ...
>"ELEMENT" ...
>"EXP" ...
>"FIRST_VALUE" ...
>"FUSION" ...
>"GROUPING" ...
>"HOUR" ...
>"LAG" ...
>"LEAD" ...
>"LEFT" ...
>"LAST_VALUE" ...
>"LN" ...
>"LOWER" ...
>"MAX" ...
>"MIN" ...
>"MINUTE" ...
>"MOD" ...
>"MONTH" ...
>"NTH_VALUE" ...
>"NTILE" ...
>"NULLIF" ...
>"OCTET_LENGTH" ...
>"PERCENT_RANK" ...
>"POWER" ...
>"RANK" ...
>"REGR_COUNT" ...
>"REGR_SXX" ...
>"REGR_SYY" ...
>"RIGHT" ...
>"ROW_NUMBER" ...
>"SECOND" ...
>"SQRT" ...
>"STDDEV_POP" ...
>"STDDEV_SAMP" ...
>"SUM" ...
>"UPPER" ...
>"TRUNCATE" ...
>"VAR_POP" ...
>"VAR_SAMP" ...
>"YEAR" ...
>"YEAR" "(" ...
>
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>Encountered "year =" at line 4, column 51.
>Was expecting one of:
>"ARRAY" ...
>"CASE" ...
>"CURRENT" ...
>"CURRENT_CATALOG" ...
>"CURRENT_DATE" ...
>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>"CURRENT_PATH" ...
>"CURRENT_ROLE" ...
>"CURRENT_SCHEMA" ...
>"CURRENT_TIME" ...
>"CURRENT_TIMESTAMP" ...
>"CURRENT_USER" ...
>"DATE" ...
>"EXISTS" ...
>"FALSE" ...
>"INTERVAL" ...
>"LOCALTIME" ...
>"LOCALTIMESTAMP" ...
>"MULTISET" ...
>"NEW" ...
>"NEXT" ...
>"NOT" ...
>"NULL" ...
>"PERIOD" ...
>"SESSION_USER" ...
>"SYSTEM_USER" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
>"USER" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"?" ...
>"+" ...
>"-" ...
> ...
> ...
> ...
> ...
> ...
>"CAST" ...
>"EXTRACT" ...
>"POSITION" ...
>"CONVERT" ...
>"TRANSLATE" ...
>"OVERLAY" ...
>"FLOOR" ...
>"CEIL" ...
>"CEILING" ...
>"SUBSTRING" ...
>"TRIM" ...
>"CLASSIFIER" ...
>"MATCH_NUMBER" ...
>"RUNNING" ...
>"PREV" ...
>"JSON_EXISTS" ...
>"JSON_VALUE" ...
>"JSON_QUERY" ...
>"JSON_OBJECT" ...
>  

Re: Re: Re: Flink sql 跨库

2020-05-27 Thread Rui Li
year在calcite里是保留关键字,你用`year`试试呢

On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL parse failed. Encountered "year =" at line 4,
> column 51.
> Was expecting one of:
> "ARRAY" ...
> "CASE" ...
> "CURRENT" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DATE" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "CURRENT_USER" ...
> "DATE" ...
> "EXISTS" ...
> "FALSE" ...
> "INTERVAL" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "MULTISET" ...
> "NEW" ...
> "NEXT" ...
> "NOT" ...
> "NULL" ...
> "PERIOD" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "TIME" ...
> "TIMESTAMP" ...
> "TRUE" ...
> "UNKNOWN" ...
> "USER" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "?" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
> "MAP" ...
> "SPECIFIC" ...
> "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "YEAR" "(" ...
>
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Encountered "year =" at line 4, column 51.
> Was expecting one of:
> "ARRAY" ...
> "CASE" ...
> "CURRENT" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DATE" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "CURRENT_USER" ...
> "DATE" ...
> "EXISTS" ...
> "FALSE" ...
> "INTERVAL" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "MULTISET" ...
> "NEW" ...
> "NEXT" ...
> "NOT" ...
> "NULL" ...
> "PERIOD" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "TIME" ...
> "TIMESTAMP" ...
> "TRUE" ...
> "UNKNOWN" ...
> "USER" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "?" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> 

Re:Re: Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
Was expecting one of:
"ARRAY" ...
"CASE" ...
"CURRENT" ...
"CURRENT_CATALOG" ...
"CURRENT_DATE" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"CURRENT_USER" ...
"DATE" ...
"EXISTS" ...
"FALSE" ...
"INTERVAL" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"MULTISET" ...
"NEW" ...
"NEXT" ...
"NOT" ...
"NULL" ...
"PERIOD" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
"USER" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"?" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
"MAP" ...
"SPECIFIC" ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"YEAR" "(" ...


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "year =" at line 4, column 51.
Was expecting one of:
"ARRAY" ...
"CASE" ...
"CURRENT" ...
"CURRENT_CATALOG" ...
"CURRENT_DATE" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"CURRENT_USER" ...
"DATE" ...
"EXISTS" ...
"FALSE" ...
"INTERVAL" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"MULTISET" ...
"NEW" ...
"NEXT" ...
"NOT" ...
"NULL" ...
"PERIOD" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
"USER" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"?" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
"MAP" ...
"SPECIFIC" ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
 

Re: Re: Flink sql 跨库

2020-05-27 Thread Rui Li
读hive分区表报的什么错啊,把stacktrace贴一下?

On Wed, May 27, 2020 at 6:08 PM Zhou Zach  wrote:

>
>
> hive partition table:
>
>
> 1CREATE TABLE `dwd.bill`(
> 2  `id` bigint,
> 3  `gid` bigint,
> 4  `count` bigint,
> 5  `price` bigint,
> 6  `srcuid` bigint,
> 7  `srcnickname` string,
> 8  `srcleftmoney` bigint,
> 9  `srcwealth` bigint,
> 10  `srccredit` decimal(10,0),
> 11  `dstnickname` string,
> 12  `dstuid` bigint,
> 13  `familyid` int,
> 14  `dstleftmoney` bigint,
> 15  `dstwealth` bigint,
> 16  `dstcredit` decimal(10,0),
> 17  `addtime` bigint,
> 18  `type` int,
> 19  `getmoney` decimal(10,0),
> 20  `os` int,
> 21  `bak` string,
> 22  `getbonus` decimal(10,0),
> 23  `unionbonus` decimal(10,0))
> 24PARTITIONED BY (
> 25  `year` int,
> 26  `month` int)
> 27ROW FORMAT SERDE
> 28  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> 29STORED AS INPUTFORMAT
> 30  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> 31OUTPUTFORMAT
> 32  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
>
>
>
>
> Query:
>
>
> tableEnv.sqlUpdate(
>   """
> |
> |INSERT INTO catalog2.dwd.orders
> |select srcuid, price from catalog2.dwd.bill where year = 2020
> |
> |
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-27 18:01:19,"Leonard Xu"  写道:
> >Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?
> >
> >祝好
> >Leonard Xu
> >
> >> 在 2020年5月27日,17:40,Zhou Zach  写道:
> >>
> >>
> >>
> >>
> >> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
> >> 现在遇到个问题,flink 读hive
> 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 hive分区表,还是哪个地方没设置对
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
> >>> Hi,
>  因为一个HiveCatalog只能关联一个库
> >>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
> >>>
> >>> Flink SQL> show catalogs;
> >>> default_catalog
> >>> myhive
> >>> Flink SQL> use catalog myhive;
> >>> Flink SQL> show databases;
> >>> default
> >>> hive_test
> >>> hive_test1
> >>> Flink SQL> select * from hive_test.db2_table union select * from
> myhive.hive_test1.db1_table;
> >>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
> >>>
> >>>
> >>>
> >>> 祝好
> >>> Leonard Xu
> >>>
> >>>
>  在 2020年5月27日,10:55,Zhou Zach  写道:
> 
>  hi all,
>  Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink
> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库
>


-- 
Best regards!
Rui Li


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 Thread Benchao Li
你的理解是对的。你可以尝试下用time windowed join[1],这个不管是什么join类型,结果都是append的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

macia kk  于2020年5月27日周三 下午6:56写道:

> 感谢 Benchao
>
> 原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
> watermark 结束之后才 emit
>
> Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
> 到左边已经发出去的这条数据,会产生 retract. (我的理解)
>
>
> 那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
> 场景。
>
>
>
>
> Benchao Li  于2020年5月27日周三 下午6:32写道:
>
> > 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> > 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> > 2. 非inner/anti 的join(不包括time interval
> > join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> > 3. 取latest的去重
> > 4. topn,排名变化需要更新结果
> > 5. window + emit,提前emit的结果需要retract来更新
> >
> > macia kk  于2020年5月27日周三 下午6:19写道:
> >
> > > 感谢 Benchao 和  Leonard 的回复
> > >
> > > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > > 出去,但是什么情况下会产生 react 消息呢?
> > >
> > > Leonard Xu  于2020年5月27日周三 下午3:50写道:
> > >
> > > > Hi
> > > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > > sink无法处理retract消息。
> > > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > > >
> > > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > > >
> > > >
> > > > 祝好,
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > > > >
> > > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > > >
> > > > >   | FROM (
> > > > >   |SELECT `database`, `table`,
> > > > > `transaction_type`, `transaction_id`,
> > > > >   |`merchant_id`, `event_time`,
> `status`,
> > > > > `reference_id`
> > > > >   |FROM main_table
> > > > >   |LEFT JOIN merchant_table
> > > > >   |ON main_table.reference_id =
> > > > > merchant_table.transaction_sn
> > > > >   | )
> > > > >
> > > > >
> > > > > macia kk  于2020年5月27日周三 上午1:20写道:
> > > > >
> > > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > > >>
> > > > >> Source: Kafka
> > > > >> SinkL Kafka
> > > > >>
> > > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > > 函数取第一条
> > > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > > >>
> > > > >> Error
> > > > >>
> > > > >> org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > >> method caused an error: AppendStreamTableSink requires that Table
> > has
> > > > >> only insert changes.
> > > > >>at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > > >>at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > > >>at
> > > > >>
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Code
> > > > >>
> > > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > > >> `status`"
> > > > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > > > >>
> > > > >>val merchant_column = "transaction_sn, user_id"
> > > > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> > $merchant_column
> > > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > > >> 'wallet_id_merchant_db%' ")
> > > > >>bsTableEnv.createTemporaryView("merchant_table",
> merchant_table)
> > > > >>
> > > > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > > >>   | SELECT `database`, `table`,
> > > > >> `transaction_type`,
> > > > >>   |   `merchant_id`, `event_time`,
> `status`,
> > > > >>   |FIRST_VALUE(`transaction_id`) OVER
> > > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > > >> PRECEDING)
> > > > >>   | FROM (
> > > > >>   |SELECT `database`, `table`,
> > > > >> `transaction_type`, `transaction_id`,
> > > > >>   |`merchant_id`, `event_time`,
> > `status`,
> > > > >> `reference_id`
> > > > >>   |FROM main_table
> > > > >>   |LEFT JOIN merchant_table
> > > > >>   |ON main_table.reference_id =
> > > > >> merchant_table.transaction_sn
> > > > >>   

Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 Thread macia kk
感谢 Benchao

原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
watermark 结束之后才 emit

Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
到左边已经发出去的这条数据,会产生 retract. (我的理解)


那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
场景。




Benchao Li  于2020年5月27日周三 下午6:32写道:

> 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> 2. 非inner/anti 的join(不包括time interval
> join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> 3. 取latest的去重
> 4. topn,排名变化需要更新结果
> 5. window + emit,提前emit的结果需要retract来更新
>
> macia kk  于2020年5月27日周三 下午6:19写道:
>
> > 感谢 Benchao 和  Leonard 的回复
> >
> > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > 出去,但是什么情况下会产生 react 消息呢?
> >
> > Leonard Xu  于2020年5月27日周三 下午3:50写道:
> >
> > > Hi
> > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > sink无法处理retract消息。
> > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > >
> > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > >
> > >
> > > 祝好,
> > > Leonard Xu
> > >
> > >
> > >
> > > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > > >
> > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > >
> > > >   | FROM (
> > > >   |SELECT `database`, `table`,
> > > > `transaction_type`, `transaction_id`,
> > > >   |`merchant_id`, `event_time`, `status`,
> > > > `reference_id`
> > > >   |FROM main_table
> > > >   |LEFT JOIN merchant_table
> > > >   |ON main_table.reference_id =
> > > > merchant_table.transaction_sn
> > > >   | )
> > > >
> > > >
> > > > macia kk  于2020年5月27日周三 上午1:20写道:
> > > >
> > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > >>
> > > >> Source: Kafka
> > > >> SinkL Kafka
> > > >>
> > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > 函数取第一条
> > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > >>
> > > >> Error
> > > >>
> > > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > > >> method caused an error: AppendStreamTableSink requires that Table
> has
> > > >> only insert changes.
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > >>at
> > > >>
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Code
> > > >>
> > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > >> `status`"
> > > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > > >>
> > > >>val merchant_column = "transaction_sn, user_id"
> > > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> $merchant_column
> > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > >> 'wallet_id_merchant_db%' ")
> > > >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > > >>
> > > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > >>   | SELECT `database`, `table`,
> > > >> `transaction_type`,
> > > >>   |   `merchant_id`, `event_time`, `status`,
> > > >>   |FIRST_VALUE(`transaction_id`) OVER
> > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > >> PRECEDING)
> > > >>   | FROM (
> > > >>   |SELECT `database`, `table`,
> > > >> `transaction_type`, `transaction_id`,
> > > >>   |`merchant_id`, `event_time`,
> `status`,
> > > >> `reference_id`
> > > >>   |FROM main_table
> > > >>   |LEFT JOIN merchant_table
> > > >>   |ON main_table.reference_id =
> > > >> merchant_table.transaction_sn
> > > >>   | )
> > > >>   |""".stripMargin)
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 Thread Benchao Li
产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
2. 非inner/anti 的join(不包括time interval
join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
3. 取latest的去重
4. topn,排名变化需要更新结果
5. window + emit,提前emit的结果需要retract来更新

macia kk  于2020年5月27日周三 下午6:19写道:

> 感谢 Benchao 和  Leonard 的回复
>
> 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> 出去,但是什么情况下会产生 react 消息呢?
>
> Leonard Xu  于2020年5月27日周三 下午3:50写道:
>
> > Hi
> > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > sink无法处理retract消息。
> > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> >
> > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> >
> >
> > 祝好,
> > Leonard Xu
> >
> >
> >
> > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > >
> > > 而且你的SQL里面有一部分是会产生retract的:
> > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > >
> > >   | FROM (
> > >   |SELECT `database`, `table`,
> > > `transaction_type`, `transaction_id`,
> > >   |`merchant_id`, `event_time`, `status`,
> > > `reference_id`
> > >   |FROM main_table
> > >   |LEFT JOIN merchant_table
> > >   |ON main_table.reference_id =
> > > merchant_table.transaction_sn
> > >   | )
> > >
> > >
> > > macia kk  于2020年5月27日周三 上午1:20写道:
> > >
> > >> Hi,各位大佬,谁有空帮我看下这个问题
> > >>
> > >> Source: Kafka
> > >> SinkL Kafka
> > >>
> > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> 函数取第一条
> > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > >>
> > >> Error
> > >>
> > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > >> method caused an error: AppendStreamTableSink requires that Table has
> > >> only insert changes.
> > >>at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > >>at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > >>at
> > >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> Code
> > >>
> > >>   val main_column = "`database`, `table`, `transaction_type`,
> > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > >> `status`"
> > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > >>
> > >>val merchant_column = "transaction_sn, user_id"
> > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > >> 'wallet_id_merchant_db%' ")
> > >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > >>
> > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > >>   | SELECT `database`, `table`,
> > >> `transaction_type`,
> > >>   |   `merchant_id`, `event_time`, `status`,
> > >>   |FIRST_VALUE(`transaction_id`) OVER
> > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > >> PRECEDING)
> > >>   | FROM (
> > >>   |SELECT `database`, `table`,
> > >> `transaction_type`, `transaction_id`,
> > >>   |`merchant_id`, `event_time`, `status`,
> > >> `reference_id`
> > >>   |FROM main_table
> > >>   |LEFT JOIN merchant_table
> > >>   |ON main_table.reference_id =
> > >> merchant_table.transaction_sn
> > >>   | )
> > >>   |""".stripMargin)
> > >>
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> >
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 Thread macia kk
感谢 Benchao 和  Leonard 的回复

我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
出去,但是什么情况下会产生 react 消息呢?

Leonard Xu  于2020年5月27日周三 下午3:50写道:

> Hi
> Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> sink无法处理retract消息。
> 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
>
> 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
>
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年5月27日,10:23,Benchao Li  写道:
> >
> > 而且你的SQL里面有一部分是会产生retract的:
> > 这里用的是regular left join,这种join类型是会产生retract结果的。
> >
> >   | FROM (
> >   |SELECT `database`, `table`,
> > `transaction_type`, `transaction_id`,
> >   |`merchant_id`, `event_time`, `status`,
> > `reference_id`
> >   |FROM main_table
> >   |LEFT JOIN merchant_table
> >   |ON main_table.reference_id =
> > merchant_table.transaction_sn
> >   | )
> >
> >
> > macia kk  于2020年5月27日周三 上午1:20写道:
> >
> >> Hi,各位大佬,谁有空帮我看下这个问题
> >>
> >> Source: Kafka
> >> SinkL Kafka
> >>
> >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
> >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> >>
> >> Error
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: AppendStreamTableSink requires that Table has
> >> only insert changes.
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Code
> >>
> >>   val main_column = "`database`, `table`, `transaction_type`,
> >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> >> `status`"
> >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> >>bsTableEnv.createTemporaryView("main_table", main_table)
> >>
> >>val merchant_column = "transaction_sn, user_id"
> >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> >> 'wallet_id_merchant_db%' ")
> >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> >>
> >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> >>   | SELECT `database`, `table`,
> >> `transaction_type`,
> >>   |   `merchant_id`, `event_time`, `status`,
> >>   |FIRST_VALUE(`transaction_id`) OVER
> >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> >> PRECEDING)
> >>   | FROM (
> >>   |SELECT `database`, `table`,
> >> `transaction_type`, `transaction_id`,
> >>   |`merchant_id`, `event_time`, `status`,
> >> `reference_id`
> >>   |FROM main_table
> >>   |LEFT JOIN merchant_table
> >>   |ON main_table.reference_id =
> >> merchant_table.transaction_sn
> >>   | )
> >>   |""".stripMargin)
> >>
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>


Re:Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach


hive partition table:


1CREATE TABLE `dwd.bill`(
2  `id` bigint, 
3  `gid` bigint, 
4  `count` bigint, 
5  `price` bigint, 
6  `srcuid` bigint, 
7  `srcnickname` string, 
8  `srcleftmoney` bigint, 
9  `srcwealth` bigint, 
10  `srccredit` decimal(10,0), 
11  `dstnickname` string, 
12  `dstuid` bigint, 
13  `familyid` int, 
14  `dstleftmoney` bigint, 
15  `dstwealth` bigint, 
16  `dstcredit` decimal(10,0), 
17  `addtime` bigint, 
18  `type` int, 
19  `getmoney` decimal(10,0), 
20  `os` int, 
21  `bak` string, 
22  `getbonus` decimal(10,0), 
23  `unionbonus` decimal(10,0))
24PARTITIONED BY ( 
25  `year` int, 
26  `month` int)
27ROW FORMAT SERDE 
28  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
29STORED AS INPUTFORMAT 
30  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
31OUTPUTFORMAT 
32  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'




Query:


tableEnv.sqlUpdate(
  """
|
|INSERT INTO catalog2.dwd.orders
|select srcuid, price from catalog2.dwd.bill where year = 2020
|
|
|""".stripMargin)

















在 2020-05-27 18:01:19,"Leonard Xu"  写道:
>Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?
>
>祝好
>Leonard Xu
>
>> 在 2020年5月27日,17:40,Zhou Zach  写道:
>> 
>> 
>> 
>> 
>> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
>> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 
>> 不支持 hive分区表,还是哪个地方没设置对
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>>> Hi,
 因为一个HiveCatalog只能关联一个库
>>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>>> 
>>> Flink SQL> show catalogs;
>>> default_catalog
>>> myhive
>>> Flink SQL> use catalog myhive;
>>> Flink SQL> show databases;
>>> default
>>> hive_test
>>> hive_test1
>>> Flink SQL> select * from hive_test.db2_table union select * from 
>>> myhive.hive_test1.db1_table;
>>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>>> 
>>> 
>>> 
>>> 祝好
>>> Leonard Xu
>>> 
>>> 
 在 2020年5月27日,10:55,Zhou Zach  写道:
 
 hi all,
 Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
 sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库


Re: unsubcribe

2020-05-27 Thread Leonard Xu
Hi,

Please send mail to user-unsubscr...@flink.apache.org 
 to unsubscribe the mails from 
user@flink.apache.org  , refer [1]

Best,
Leonard
[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 
 



Re: Flink sql 跨库

2020-05-27 Thread Leonard Xu
Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?

祝好
Leonard Xu

> 在 2020年5月27日,17:40,Zhou Zach  写道:
> 
> 
> 
> 
> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 
> hive分区表,还是哪个地方没设置对
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>> Hi,
>>> 因为一个HiveCatalog只能关联一个库
>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>> 
>> Flink SQL> show catalogs;
>> default_catalog
>> myhive
>> Flink SQL> use catalog myhive;
>> Flink SQL> show databases;
>> default
>> hive_test
>> hive_test1
>> Flink SQL> select * from hive_test.db2_table union select * from 
>> myhive.hive_test1.db1_table;
>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>> 
>> 
>> 
>> 祝好
>> Leonard Xu
>> 
>> 
>>> 在 2020年5月27日,10:55,Zhou Zach  写道:
>>> 
>>> hi all,
>>> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
>>> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库



Re: flink-python 配置文件问题

2020-05-27 Thread Xingbo Huang
你好,使用-pyfs指定的python文件会在运行时将python文件加入到PYTHONPATH下面,不会加到classpath下面。

guaishushu1...@163.com  于2020年5月27日周三 下午5:50写道:

> 使用命令-pysf 加载python配置文件  发现classpath并没有这个文件
>
> --
> guaishushu1...@163.com
>


unsubcribe

2020-05-27 Thread 王洪达




flink-python 配置文件问题

2020-05-27 Thread guaishushu1...@163.com
使用命令-pysf 加载python配置文件  发现classpath并没有这个文件



guaishushu1...@163.com


Re: Collecting operators real output cardinalities as json files

2020-05-27 Thread Francesco Ventura
Hi Piotrek,

Thank you for you replay and for your suggestions. Just another doubt.
Does the usage of metrics reporter and custom metrics will affect the 
performances of the running jobs in term of execution time? Since I need the 
information about the exact netRunTime of each job maybe using the REST APIs to 
get the other information will be more reliable?

Thank you. Best,

Francesco

> Il giorno 25 mag 2020, alle ore 19:54, Piotr Nowojski  
> ha scritto:
> 
> Hi Francesco,
> 
> Have you taken a look at the metrics? [1] And IO metrics [2] in particular? 
> You can use some of the pre-existing metric reporter [3] or implement a 
> custom one. You could export metrics to some 3rd party system, and get JSONs 
> from there, or export them to JSON directly via a custom metric reporter.
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
>  
> 
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>  
> 
> 
>> On 23 May 2020, at 11:31, Francesco Ventura 
>> > > wrote:
>> 
>> Hi everybody, 
>> 
>> I would like to collect the statistics and the real output cardinalities 
>> about the execution of many jobs as json files. I know that exist a REST 
>> interface that can be used but I was looking for something simpler. In 
>> practice, I would like to get the information showed in the WebUI at runtime 
>> about a job and store it as a file. I am using the env.getExecutionPlan() to 
>> get the execution plan of a job with the estimated cardinalities for each 
>> operator. However, it includes only estimated cardinalities and it can be 
>> used only before calling env.execute(). 
>> 
>> There is a similar way to extract the real output cardinalities of each 
>> pipeline after the execution? 
>> Is there a place where the Flink cluster stores the history of the 
>> information about executed jobs?
>> Developing a REST client to extract such information is the only way 
>> possible? 
>> 
>> I also would like to avoid adding counters to the job source code since I am 
>> monitoring the run time execution and I should avoid everything that can 
>> interfere.
>> 
>> Maybe is a trivial problem but I have a quick look around and I can not find 
>> the solution.
>> 
>> Thank you very much,
>> 
>> Francesco
> 



Re:Re: Flink sql 跨库

2020-05-27 Thread Zhou Zach



感谢回复,表名前加上Catalog和db前缀可以成功访问了。
现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 
hive分区表,还是哪个地方没设置对














在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>Hi,
>> 因为一个HiveCatalog只能关联一个库
>一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>
>Flink SQL> show catalogs;
>default_catalog
>myhive
>Flink SQL> use catalog myhive;
>Flink SQL> show databases;
>default
>hive_test
>hive_test1
>Flink SQL> select * from hive_test.db2_table union select * from 
>myhive.hive_test1.db1_table;
>2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>
>
>
>祝好
>Leonard Xu
>
>
>> 在 2020年5月27日,10:55,Zhou Zach  写道:
>> 
>> hi all,
>> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
>> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库


Re: Flink sql 跨库

2020-05-27 Thread Leonard Xu
Hi,
> 因为一个HiveCatalog只能关联一个库
一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.

Flink SQL> show catalogs;
default_catalog
myhive
Flink SQL> use catalog myhive;
Flink SQL> show databases;
default
hive_test
hive_test1
Flink SQL> select * from hive_test.db2_table union select * from 
myhive.hive_test1.db1_table;
2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf



祝好
Leonard Xu


> 在 2020年5月27日,10:55,Zhou Zach  写道:
> 
> hi all,
> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库



multiple sources

2020-05-27 Thread Aissa Elaffani
Hello everyone,
I hope you all doing well.I am reading from a Kafka topic some real-time
messages produced by some sensors, and in order to do some aggregations, I
need to enrich the stream with other data that are stocked in a mongoDB.
So, I want to know if it is possible to work with two sources in one job?
if Yes, How to do so ?
Best,
Aissa


Re: In consistent Check point API response

2020-05-27 Thread Vijay Bhaskar
Thanks Yun. In that case  it would be good to give the reference of that
documentation in the Flink Rest API:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
while explaining about the checkpoints. Tomorrow any one want to use REST
API, they will get easy reference of the monitoring document of
checkpoints. It would give them complete idea. So I will open Jira with
this requirement

Regards
Bhaskar

On Wed, May 27, 2020 at 11:59 AM Yun Tang  wrote:

> To be honest, from my point of view current description should have
> already give enough explanations [1] in "Overview Tab".
> *Latest Completed Checkpoint*: The latest successfully completed
> checkpoints.
> *Latest Restore*: There are two types of restore operations.
>
>- Restore from Checkpoint: We restored from a regular periodic
>checkpoint.
>- Restore from Savepoint: We restored from a savepoint.
>
>
> You could still create a JIRA issue and give your ideas in that issue. If
> agreed to work on in that ticket, you can create a PR to edit
> checkpoint_monitoring.md [2] and checkpoint_monitoring.zh.md [3] to
> update related documentation.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/checkpoint_monitoring.html#overview-tab
> [2]
> https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.md
> [3]
> https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.zh.md
>
> Best
> Yun Tang
> --
> *From:* Vijay Bhaskar 
> *Sent:* Tuesday, May 26, 2020 15:18
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: In consistent Check point API response
>
> Thanks Yun. How can i contribute better documentation of the same by
> opening Jira on this?
>
> Regards
> Bhaskar
>
> On Tue, May 26, 2020 at 12:32 PM Yun Tang  wrote:
>
> Hi Bhaskar
>
> I think I have understood your scenario now. And I think this is what
> expected in Flink.
> As you only allow your job could restore 5 times, the "restore" would only
> record the checkpoint to restore at the 5th recovery, and the checkpoint id
> would always stay there.
>
> "Restored" is for last restored checkpoint and "completed" is for last
> completed checkpoint, they are actually not the same thing.
> The only scenario that they're the same in numbers is when Flink just
> restore successfully before a new checkpoint completes.
>
> Best
> Yun Tang
>
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Tuesday, May 26, 2020 12:19
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: In consistent Check point API response
>
> Hi Yun
> Understood the issue now:
> "restored" always shows only the check point that is used for restoring
> previous state
> In all the attempts < 6 ( in my case max attempts are 5, 6 is the last
> attempt)
>   Flink HA is  restoring the state, so restored and latest are same value
> if the last attempt  == 6
>  Flink job already has few check points
>  After that job failed and Flink HA gave up and marked the job state as
> "FAILED"
>At this point "restored". value is the one which is in 5'th attempt but
> latest is the one which is the latest checkpoint which is retained
>
> Shall i file any documentation improvement Jira? I want to add more
> documentation with the help of  the above scenarios.
>
> Regards
> Bhaskar
>
>
>
> On Tue, May 26, 2020 at 8:14 AM Yun Tang  wrote:
>
> Hi Bhaskar
>
> It seems I still not understand your case-5 totally. Your job failed 6
> times, and recover from previous checkpoint to restart again. However, you
> found the REST API told the wrong answer.
> How do you ensure your "restored" field is giving the wrong checkpoint
> file which is not latest? Have you ever checked the log in JM to view
> related contents: "Restoring job xxx from latest valid checkpoint: x@"
> [1] to know exactly which checkpoint choose to restore?
>
> I think you could give a more concrete example e.g. which expected/actual
> checkpoint to restore, to tell your story.
>
> [1]
> https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250
>
> Best
> Yun Tang
> --
> *From:* Vijay Bhaskar 
> *Sent:* Monday, May 25, 2020 17:01
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: In consistent Check point API response
>
> Thanks Yun.
> Here is the problem i am facing:
>
> I am using  jobs/:jobID/checkpoints  API to recover the failed job. We
> have the remote manager which monitors the jobs.  We are using "restored"
> field of the API response to get the latest check point file to use. Its
> giving correct checkpoint file for all the 4 cases except the 5'th case.
> Where the "restored" field is giving the wrong check point file which is
> not latest.  When we compare the  check point file returned by  the
> "completed". field, both are giving identical checkpoints in all 4 cases,
> except 

Re: ClusterClientFactory selection

2020-05-27 Thread Kostas Kloudas
Hi Singh,

The only thing to add to what Yang said is that the "execution.target"
configuration option (in the config file) is also used for the same
purpose from the execution environments.

Cheers,
Kostas

On Wed, May 27, 2020 at 4:49 AM Yang Wang  wrote:
>
> Hi M Singh,
>
> The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
> could check YarnClusterClientFactory#isCompatibleWith for how it is activated.
> The cli option / configuration is "-e/--executor" or execution.target (e.g. 
> yarn-per-job).
>
>
> Best,
> Yang
>
> M Singh  于2020年5月26日周二 下午6:45写道:
>>
>> Hi:
>>
>> I wanted to find out which parameter/configuration allows flink cli pick up 
>> the appropriate cluster client factory (especially in the yarn mode).
>>
>> Thanks


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 Thread Leonard Xu
Hi
Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 sink无法处理retract消息。
你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,

通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。


祝好,
Leonard Xu



> 在 2020年5月27日,10:23,Benchao Li  写道:
> 
> 而且你的SQL里面有一部分是会产生retract的:
> 这里用的是regular left join,这种join类型是会产生retract结果的。
> 
>   | FROM (
>   |SELECT `database`, `table`,
> `transaction_type`, `transaction_id`,
>   |`merchant_id`, `event_time`, `status`,
> `reference_id`
>   |FROM main_table
>   |LEFT JOIN merchant_table
>   |ON main_table.reference_id =
> merchant_table.transaction_sn
>   | )
> 
> 
> macia kk  于2020年5月27日周三 上午1:20写道:
> 
>> Hi,各位大佬,谁有空帮我看下这个问题
>> 
>> Source: Kafka
>> SinkL Kafka
>> 
>> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
>> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
>> 
>> Error
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: AppendStreamTableSink requires that Table has
>> only insert changes.
>>at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Code
>> 
>>   val main_column = "`database`, `table`, `transaction_type`,
>> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
>> `status`"
>>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
>> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
>>bsTableEnv.createTemporaryView("main_table", main_table)
>> 
>>val merchant_column = "transaction_sn, user_id"
>>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
>> FROM Keystats_airpay_consumer WHERE `table` LIKE
>> 'wallet_id_merchant_db%' ")
>>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
>> 
>>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
>>   | SELECT `database`, `table`,
>> `transaction_type`,
>>   |   `merchant_id`, `event_time`, `status`,
>>   |FIRST_VALUE(`transaction_id`) OVER
>> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
>> PRECEDING)
>>   | FROM (
>>   |SELECT `database`, `table`,
>> `transaction_type`, `transaction_id`,
>>   |`merchant_id`, `event_time`, `status`,
>> `reference_id`
>>   |FROM main_table
>>   |LEFT JOIN merchant_table
>>   |ON main_table.reference_id =
>> merchant_table.transaction_sn
>>   | )
>>   |""".stripMargin)
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li



Re: 回复:全局state

2020-05-27 Thread a773807...@gmail.com
不会,在第二次keyby的时候,根据id-name 做key, 然后做reduce, 
把重复的数据中的value,根据time去做判断,取time最小值的value,就可以去重了



a773807...@gmail.com
 
发件人: star
发送时间: 2020-05-27 15:45
收件人: user-zh
主题: 回复:回复:全局state
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了
 
 
 
发自我的iPhone
 
 
-- 原始邮件 --
发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.comgt; 于2020年5月26日周二 下午6:42写道:

gt; 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
gt; state,并且并行度设置为1,来实现全局state
gt;
gt;
gt; 谢谢
gt;
gt; 发自我的iPhone


回复:回复:全局state

2020-05-27 Thread star
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了



发自我的iPhone


-- 原始邮件 --
发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.comgt; 于2020年5月26日周二 下午6:42写道:

gt; 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
gt; state,并且并行度设置为1,来实现全局state
gt;
gt;
gt; 谢谢
gt;
gt; 发自我的iPhone

Re: In consistent Check point API response

2020-05-27 Thread Yun Tang
To be honest, from my point of view current description should have already 
give enough explanations [1] in "Overview Tab".
Latest Completed Checkpoint: The latest successfully completed checkpoints.
Latest Restore: There are two types of restore operations.

  *   Restore from Checkpoint: We restored from a regular periodic checkpoint.
  *   Restore from Savepoint: We restored from a savepoint.

You could still create a JIRA issue and give your ideas in that issue. If 
agreed to work on in that ticket, you can create a PR to edit 
checkpoint_monitoring.md [2] and checkpoint_monitoring.zh.md [3] to update 
related documentation.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/checkpoint_monitoring.html#overview-tab
[2] 
https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.md
[3] 
https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.zh.md

Best
Yun Tang

From: Vijay Bhaskar 
Sent: Tuesday, May 26, 2020 15:18
To: Yun Tang 
Cc: user 
Subject: Re: In consistent Check point API response

Thanks Yun. How can i contribute better documentation of the same by opening 
Jira on this?

Regards
Bhaskar

On Tue, May 26, 2020 at 12:32 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bhaskar

I think I have understood your scenario now. And I think this is what expected 
in Flink.
As you only allow your job could restore 5 times, the "restore" would only 
record the checkpoint to restore at the 5th recovery, and the checkpoint id 
would always stay there.

"Restored" is for last restored checkpoint and "completed" is for last 
completed checkpoint, they are actually not the same thing.
The only scenario that they're the same in numbers is when Flink just restore 
successfully before a new checkpoint completes.

Best
Yun Tang



From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Tuesday, May 26, 2020 12:19
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: In consistent Check point API response

Hi Yun
Understood the issue now:
"restored" always shows only the check point that is used for restoring 
previous state
In all the attempts < 6 ( in my case max attempts are 5, 6 is the last attempt)
  Flink HA is  restoring the state, so restored and latest are same value
if the last attempt  == 6
 Flink job already has few check points
 After that job failed and Flink HA gave up and marked the job state as "FAILED"
   At this point "restored". value is the one which is in 5'th attempt but 
latest is the one which is the latest checkpoint which is retained

Shall i file any documentation improvement Jira? I want to add more 
documentation with the help of  the above scenarios.

Regards
Bhaskar



On Tue, May 26, 2020 at 8:14 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bhaskar

It seems I still not understand your case-5 totally. Your job failed 6 times, 
and recover from previous checkpoint to restart again. However, you found the 
REST API told the wrong answer.
How do you ensure your "restored" field is giving the wrong checkpoint file 
which is not latest? Have you ever checked the log in JM to view related 
contents: "Restoring job xxx from latest valid checkpoint: x@" [1] to know 
exactly which checkpoint choose to restore?

I think you could give a more concrete example e.g. which expected/actual 
checkpoint to restore, to tell your story.

[1] 
https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250

Best
Yun Tang

From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Monday, May 25, 2020 17:01
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: In consistent Check point API response

Thanks Yun.
Here is the problem i am facing:

I am using  jobs/:jobID/checkpoints  API to recover the failed job. We have the 
remote manager which monitors the jobs.  We are using "restored" field of the 
API response to get the latest check point file to use. Its giving correct 
checkpoint file for all the 4 cases except the 5'th case. Where the "restored" 
field is giving the wrong check point file which is not latest.  When we 
compare the  check point file returned by  the "completed". field, both are 
giving identical checkpoints in all 4 cases, except 5'th case
We can't use flink UI in because of security reasons

Regards
Bhaskar

On Mon, May 25, 2020 at 12:57 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vijay

If I understand correct, do you mean your last "restored" checkpoint is null 
via REST api when the job failed 6 times and then recover successfully with 
another several successful checkpoints?

First of all, if your job just recovered successfully, can you observe the 
"last restored" checkpoint in web UI?

回复:native kubernetes在不同kubernetes版本构建失败问题

2020-05-27 Thread a511955993

感谢你的耐心解答~



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月27日 14:17,Yang Wang 写道:
是的

不过后续我觉得也可以考虑把这个fix backport到1.10分支,在下一次发布的时候修复

目前master版本已经包含了这个fix,但是需要你自己来build镜像

 于2020年5月27日周三 下午1:38写道:

> hi,yang
>
> 使用的镜像是docker hub提供的1.10.1版本镜像。因此解法是
> 1. 等待1.11版本
> 2. 自行构建flink 1.10.1版本镜像,降低jdk版本?
>
> Looking forward to your reply and help.
>
> Best
>
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月27日 13:25,Yang Wang 写道:
> "Broken pipe" 这个是fabric8的kubernetes-client的一个bug
> 你镜像的jdk版本是java 8u252吧,目前Flink on K8s不能和java 8u252一起工作,
> 解法是使用8u252以下的jdk版本或者升级到jdk11
>
> 在Flink 1.11里面会升级fabric8的kubernetes client依赖到最新版本来解决
>
>
> Best,
> Yang
>
>  于2020年5月27日周三 下午12:52写道:
>
> >
> > 根据文档[1]进行配置,可以看到具体日志信息,启动指令如下:
> >
> > /usr/local/flink/flink-1.10.1/bin/kubernetes-session.sh \
> >  -Dkubernetes.cluster-id=ipcode \
> >  -Dkubernetes.jobmanager.service-account=flink \
> >  -Dtaskmanager.memory.process.size=4096m \
> >  -Dkubernetes.taskmanager.cpu=2 \
> >  -Dtaskmanager.numberOfTaskSlots=4 \
> >  -Dkubernetes.namespace=flink-ipcode \
> >  -Dkubernetes.rest-service.exposed.type=NodePort \
> >  -Dkubernetes.container-start-command-template="%java%
> %classpath%
> > %jvmmem% %jvmopts% %logging% %class% %args%" \
> >  -Dakka.framesize=104857600b \
> >  -Dkubernetes.container.image=flink:1.10.1
> >
> >
> > 对应的service、deployment、ConfigMap都已经创建
> >
> >
> > kubectl get svc -n flink-ipcode
> > NAME  TYPECLUSTER-IP  EXTERNAL-IP   PORT(S)
> >   AGE
> > ipcodeClusterIP   x   
> > 8081/TCP,6123/TCP,6124/TCP   21s
> > ipcode-rest   NodePort8081:30803/TCP
> >21s
> >
> >
> > kubernetes版本 v1.17.4失败,V1.15.1成功。
> >
> > 1.17.4的jobmanager报错日志如下:
> >
> >
> > 2020-05-27 04:37:44,225 ERROR
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal
> error
> > occurred in the cluster entrypoint.
> >
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> > Could not start the ResourceManager akka.tcp://flink@ipcode.flink-ipcode
> > :6123/user/resourcemanager
> >  at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:193)
> >  at
> >
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185)
> >  at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:549)
> >  at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
> >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >  at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >  at akka.actor.Actor.aroundReceive(Actor.scala:517)
> >  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> >  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >  at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >  at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> > Operation: [list]  for kind: [Pod]  with name: [null]  in namespace:
> > [flink-ipcode]  failed.
> >  at
> >
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
> >  at
> >
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:151)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:614)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:63)
> >  at
> >
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getPodsWithLabels(Fabric8FlinkKubeClient.java:211)
> >  at
> >
> org.apache.flink.kubernetes.KubernetesResourceManager.recoverWorkerNodesFromPreviousAttempts(KubernetesResourceManager.java:233)
> >  at
> >
> 

Re: native kubernetes在不同kubernetes版本构建失败问题

2020-05-27 Thread Yang Wang
是的

不过后续我觉得也可以考虑把这个fix backport到1.10分支,在下一次发布的时候修复

目前master版本已经包含了这个fix,但是需要你自己来build镜像

 于2020年5月27日周三 下午1:38写道:

> hi,yang
>
> 使用的镜像是docker hub提供的1.10.1版本镜像。因此解法是
> 1. 等待1.11版本
> 2. 自行构建flink 1.10.1版本镜像,降低jdk版本?
>
> Looking forward to your reply and help.
>
> Best
>
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月27日 13:25,Yang Wang 写道:
> "Broken pipe" 这个是fabric8的kubernetes-client的一个bug
> 你镜像的jdk版本是java 8u252吧,目前Flink on K8s不能和java 8u252一起工作,
> 解法是使用8u252以下的jdk版本或者升级到jdk11
>
> 在Flink 1.11里面会升级fabric8的kubernetes client依赖到最新版本来解决
>
>
> Best,
> Yang
>
>  于2020年5月27日周三 下午12:52写道:
>
> >
> > 根据文档[1]进行配置,可以看到具体日志信息,启动指令如下:
> >
> > /usr/local/flink/flink-1.10.1/bin/kubernetes-session.sh \
> >  -Dkubernetes.cluster-id=ipcode \
> >  -Dkubernetes.jobmanager.service-account=flink \
> >  -Dtaskmanager.memory.process.size=4096m \
> >  -Dkubernetes.taskmanager.cpu=2 \
> >  -Dtaskmanager.numberOfTaskSlots=4 \
> >  -Dkubernetes.namespace=flink-ipcode \
> >  -Dkubernetes.rest-service.exposed.type=NodePort \
> >  -Dkubernetes.container-start-command-template="%java%
> %classpath%
> > %jvmmem% %jvmopts% %logging% %class% %args%" \
> >  -Dakka.framesize=104857600b \
> >  -Dkubernetes.container.image=flink:1.10.1
> >
> >
> > 对应的service、deployment、ConfigMap都已经创建
> >
> >
> > kubectl get svc -n flink-ipcode
> > NAME  TYPECLUSTER-IP  EXTERNAL-IP   PORT(S)
> >   AGE
> > ipcodeClusterIP   x   
> > 8081/TCP,6123/TCP,6124/TCP   21s
> > ipcode-rest   NodePort8081:30803/TCP
> >21s
> >
> >
> > kubernetes版本 v1.17.4失败,V1.15.1成功。
> >
> > 1.17.4的jobmanager报错日志如下:
> >
> >
> > 2020-05-27 04:37:44,225 ERROR
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal
> error
> > occurred in the cluster entrypoint.
> >
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> > Could not start the ResourceManager akka.tcp://flink@ipcode.flink-ipcode
> > :6123/user/resourcemanager
> >  at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:193)
> >  at
> >
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185)
> >  at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:549)
> >  at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
> >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >  at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >  at akka.actor.Actor.aroundReceive(Actor.scala:517)
> >  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> >  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >  at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >  at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> > Operation: [list]  for kind: [Pod]  with name: [null]  in namespace:
> > [flink-ipcode]  failed.
> >  at
> >
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
> >  at
> >
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:151)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:614)
> >  at
> >
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:63)
> >  at
> >
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getPodsWithLabels(Fabric8FlinkKubeClient.java:211)
> >  at
> >
> org.apache.flink.kubernetes.KubernetesResourceManager.recoverWorkerNodesFromPreviousAttempts(KubernetesResourceManager.java:233)
> >  at
> >
> org.apache.flink.kubernetes.KubernetesResourceManager.initialize(KubernetesResourceManager.java:145)
> >  at
> >
> 

Re: RocksDB savepoint recovery performance improvements

2020-05-27 Thread Yun Tang
@Joey Pereira I think you might need to create a new 
JIRA ticket and link your PR to the new issue as 
FLINK-17288 mainly focus on 
bulk load options while your solution focus on SST generator, if your solution 
could behave better, we could tag 
FLINK-17288 as "won't do".

@Steven Wu sure, Flink community always suggest to 
use savepoint to restore but current checkpoint also support it. I mention that 
is for quick fix at his scenario.

Best
Yun Tang

From: Steven Wu 
Sent: Wednesday, May 27, 2020 0:36
To: Joey Pereira 
Cc: user@flink.apache.org ; Yun Tang ; 
Mike Mintz ; Shahid Chohan ; Aaron 
Levin 
Subject: Re: RocksDB savepoint recovery performance improvements

Yun, you mentioned that checkpoint also supports rescale. I thought the 
recommendation [1] is to use savepoint for rescale.

[1] 
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Following up: I've put together the implementation, 
https://github.com/apache/flink/pull/12345. It's passing tests but is only 
partially complete, as it still needs some clean-up and configuration. I still 
need to try running this against a production cluster to check the performance, 
as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with 
that in mind.

We have already begun using checkpoints for recovery. Having these improvements 
would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I 
planed to use RocksDB's benchmark to mock scenario in Flink. However, I found 
the main challenge is how to ensure the keys are inserted in a strictly 
increasing order. The key order in java could differ from the bytes order in 
RocksDB. In your case, I think it could be much easier as 
RocksFullSnapshotStrategy write data per columnfamily per key group which 
should be in a strictly increasing order [1].

FLINK-17288 could mitigate 
the performance and your solution could help improve the performance much 
better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how 
about using checkpoint to recover, as it also supports rescale and normal 
recover.

[1] 
https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308


Best
Yun Tang

From: Joey Pereira mailto:j...@stripe.com>>
Sent: Tuesday, May 19, 2020 2:27
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Cc: Mike Mintz mailto:mikemi...@stripe.com>>; Shahid 
Chohan mailto:cho...@stripe.com>>; Aaron Levin 
mailto:aaronle...@stripe.com>>
Subject: RocksDB savepoint recovery performance improvements

Hey,

While running a Flink application with a large-state, savepoint recovery has 
been a painful part of operating the application because recovery time can be 
several hours. During some profiling that chohan (cc'd) had done, a red flag 
stood out — savepoint recovery consisted mostly of RocksDB Get and Put 
operations.

When Flink is bootstrapping state for RocksDB instances this is not what I 
would have expected, as RocksDB supports direct ingestion of the on-disk format 
(SSTables): 
https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This 
was also recently reported on Jira: 
https://issues.apache.org/jira/browse/FLINK-17288.

>From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and 
RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

* RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will 
provide atomicity of batches as well as performance benefits for batching, 
compared to individual Puts, but it will still involve RocksDB’s insert paths 
which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, 
writes can be batched even further and avoid expensive operations in RocksDB. 
This is commonly utilized by other systems for restoration or import processes, 
such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some 
restrictions on being able to generate SSTables, as well as limitations for 
ingestion to be performant. Unfortunately, it’s all not 

回复:flink集群启动显示正在运行instance多,StandaloneSessionClusterEntrypoint启动后被kill

2020-05-27 Thread smq
还是不行,可能是别的原因,昨天还好好的,今天不知道搞了什么就这样了。看了日志说resourcemanager拒绝连接。


---原始邮件---
发件人: Lijie Wang