Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 文章 Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > u...@flink.apache.org ;"user-zh"< user-zh@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 文章 Jingsong Li
Amazing, congrats!

Best,
Jingsong

On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
>
> 退订
>
>
>
>
>
>
>
> Original Email
>
>
>
> Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
>
> Sent Time:2024/5/17 23:10
>
> To:"Qingsheng Ren"< re...@apache.org ;
>
> Cc recipient:"dev"< d...@flink.apache.org ;"user"< u...@flink.apache.org 
> ;"user-zh"< user-zh@flink.apache.org ;"Apache Announce List"< 
> annou...@apache.org ;
>
> Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
>
>
> Congratulations !
> Thanks for all contributors.
>
>
> Best,
>
> Zhongqiang Gong
>
> Qingsheng Ren  于 2024年5月17日周五 17:33写道:
>
>  The Apache Flink community is very happy to announce the release of
>  Apache Flink CDC 3.1.0.
> 
>  Apache Flink CDC is a distributed data integration tool for real time
>  data and batch data, bringing the simplicity and elegance of data
>  integration via YAML to describe the data movement and transformation
>  in a data pipeline.
> 
>  Please check out the release blog post for an overview of the release:
> 
>  
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
>  The release is available for download at:
>  https://flink.apache.org/downloads.html
> 
>  Maven artifacts for Flink CDC can be found at:
>  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
>  The full release notes are available in Jira:
> 
>  
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> 
>  We would like to thank all contributors of the Apache Flink community
>  who made this release possible!
> 
>  Regards,
>  Qingsheng Ren
> 


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Jingsong Li
Congratulations!

On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>>
>> Best,
>> Yun, Jing, Martijn and Lincoln


[ANNOUNCE] Apache Flink Table Store 0.3.0 released

2023-01-13 文章 Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.3.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/13/release-table-store-0.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://central.sonatype.dev/search?q=flink-table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352111

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Best,
Jingsong Lee


Re: Re: table store 和connector-kafka包冲突吗?

2022-10-11 文章 Jingsong Li
附:修复连接:https://github.com/apache/flink-table-store/commit/c1b28985ce8bc8fb80fac96380edf3b34e4126b8

Best,
Jingsong

On Tue, Oct 11, 2022 at 3:27 PM Jingsong Li  wrote:
>
> Hi RS,
>
> 这是bug,已经修复了
>
> 建议使用即将发布的:0.2.1 
> https://lists.apache.org/thread/n1yzpbxprnsh2m8swpsr40glt8h2b93v
>
> 具体的 jar 
> 包在这里:https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.1-rc2/
>
> Best,
> Jingsong
>
> On Tue, Oct 11, 2022 at 3:17 PM 李宇彬  wrote:
> >
> > 方便贴下sql吗,我在flink 1.15 + fts 0.3.0-SNAPSHOT没复现出这个问题
> >
> >
> > 在2022年10月11日 09:19,RS 写道:
> > Hi,
> >
> >
> > 去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。
> >
> >
> > Thanks
> >
> > 在 2022-10-10 12:50:33,"yanfei lei"  写道:
> > Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
> > https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
> > 把flink-connector-kafka-1.15.1.jar 去掉再试试?
> >
> >
> > RS  于2022年10月8日周六 17:19写道:
> >
> > Hi,
> > 报错如下:
> >
> >
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.table.api.ValidationException: Multiple factories for
> > identifier 'kafka' that implement
> > 'org.apache.flink.table.factories.DynamicTableFactory' found in the
> > classpath.
> >
> >
> > Ambiguous factory classes are:
> >
> >
> > org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> > org.apache.flink.table.store.kafka.KafkaLogStoreFactory
> >
> > org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> >
> >
> > Thanks
> >
> >
> >
> >
> >
> > 在 2022-10-08 13:38:20,"Shammon FY"  写道:
> > Hi RS
> > 你这边能提供一下具体的冲突错误栈吗?
> >
> > On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
> >
> > Hi,
> >
> >
> > 版本:flink-1.15.1
> > 使用table
> >
> > store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
> >
> >
> > 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
> >
> >
> > Thanks
> >


Re: Re: table store 和connector-kafka包冲突吗?

2022-10-11 文章 Jingsong Li
Hi RS,

这是bug,已经修复了

建议使用即将发布的:0.2.1 https://lists.apache.org/thread/n1yzpbxprnsh2m8swpsr40glt8h2b93v

具体的 jar 
包在这里:https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.1-rc2/

Best,
Jingsong

On Tue, Oct 11, 2022 at 3:17 PM 李宇彬  wrote:
>
> 方便贴下sql吗,我在flink 1.15 + fts 0.3.0-SNAPSHOT没复现出这个问题
>
>
> 在2022年10月11日 09:19,RS 写道:
> Hi,
>
>
> 去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。
>
>
> Thanks
>
> 在 2022-10-10 12:50:33,"yanfei lei"  写道:
> Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
> https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
> 把flink-connector-kafka-1.15.1.jar 去掉再试试?
>
>
> RS  于2022年10月8日周六 17:19写道:
>
> Hi,
> 报错如下:
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Multiple factories for
> identifier 'kafka' that implement
> 'org.apache.flink.table.factories.DynamicTableFactory' found in the
> classpath.
>
>
> Ambiguous factory classes are:
>
>
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> org.apache.flink.table.store.kafka.KafkaLogStoreFactory
>
> org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>
>
> Thanks
>
>
>
>
>
> 在 2022-10-08 13:38:20,"Shammon FY"  写道:
> Hi RS
> 你这边能提供一下具体的冲突错误栈吗?
>
> On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
>
> Hi,
>
>
> 版本:flink-1.15.1
> 使用table
>
> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>
>
> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>
>
> Thanks
>


Re: [ANNOUNCE] Apache Flink 1.14.6 released

2022-09-28 文章 Jingsong Li
Thanks Xingbo for releasing it.

Best,
Jingsong

On Wed, Sep 28, 2022 at 10:52 AM Xingbo Huang  wrote:
>
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.14.6, which is the fifth bugfix release for the Apache Flink 1.14 
> series.
>
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2022/09/28/release-1.14.6.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351834
>
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>
> Regards,
> Xingbo


[ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 文章 Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.2.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Jingsong Lee


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 文章 Jingsong Li
Thanks Xingtong, Jark, Martijn and Robert for making this possible!

Best,
Jingsong


On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:

> Thank Xingtong for making this possible!
>
> Cheers,
> Jark Wu
>
> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm very happy to announce that the Apache Flink community has created a
> > dedicated Slack workspace [1]. Welcome to join us on Slack.
> >
> > ## Join the Slack workspace
> >
> > You can join the Slack workspace by either of the following two ways:
> > 1. Click the invitation link posted on the project website [2].
> > 2. Ask anyone who already joined the Slack workspace to invite you.
> >
> > We recommend 2), if available. Due to Slack limitations, the invitation
> > link in 1) expires and needs manual updates after every 100 invites. If
> it
> > is expired, please reach out to the dev / user mailing lists.
> >
> > ## Community rules
> >
> > When using the community Slack workspace, please follow these community
> > rules:
> > * *Be respectful* - This is the most important rule!
> > * All important decisions and conclusions *must be reflected back to the
> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
> happen."
> > - The Apache Mottos [3]
> > * Use *Slack threads* to keep parallel conversations from overwhelming a
> > channel.
> > * Please *do not direct message* people for troubleshooting, Jira
> assigning
> > and PR review. These should be picked-up voluntarily.
> >
> >
> > ## Maintenance
> >
> >
> > Committers can refer to this wiki page [4] for information needed for
> > maintaining the Slack workspace.
> >
> >
> > Thanks Jark, Martijn and Robert for helping setting up the Slack
> workspace.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://apache-flink.slack.com/
> >
> > [2] https://flink.apache.org/community.html#slack
> >
> > [3] http://theapacheway.com/on-list/
> >
> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
> >
>


Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-26 文章 Jingsong Li
比如对于 1.13.6

把flink下面的/lib/flink-table_2.11-1.13.6.jar给删掉

Best,
Jingsong

On Thu, May 26, 2022 at 2:54 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 好的,知道了,以后用这个中文邮件!
> flink-table-legacy 这个模块应该没用,但是我也没找到他在那,是一个独立的模块吗?
>
> --
> hdxg1101300...@163.com
>
>
> *发件人:* Jingsong Li 
> *发送时间:* 2022-05-26 14:47
> *收件人:* hdxg1101300123 
> *抄送:* dev 
> *主题:* Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误
> Please don't use Chinese on the dev mailing list to discuss issues, I've
> replied on user-zh@flink.apache.org.
>
> Best,
> Jingsong
>
> On Thu, May 26, 2022 at 2:43 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
>>
>>
>> --
>> hdxg1101300...@163.com
>>
>>
>> *发件人:* Jingsong Li 
>> *发送时间:* 2022-05-26 14:20
>> *收件人:* dev 
>> *主题:* Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误
>> 如果flink-table-legacy包没用到,把它删掉即可修复你的问题
>>
>> Best,
>> Jingsong
>>
>> On Thu, May 26, 2022 at 2:16 PM hdxg1101300...@163.com <
>> hdxg1101300...@163.com> wrote:
>>
>> > 你好:
>> > 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误:
>> >
>> >
>> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误;
>> >sql如下:
>> >create view if not exists dwm_ai_robot_contact_view as select
>> >
>> CALLER,CALLEE,CUST_LEVEL,PRO_ID,REASON_CODE,INTENT_NAME,GEN_CENTER,BUS_PRO_ID
>> > from realtime_robot_contact table_a left join dc_dim.dim_province_code
>> /*+
>> > OPTIONS('lookup.join.cache.ttl'='12 h') */ for SYSTEM_TIME as of
>> > table_a.pro_time as dim on table_a.PRO_ID = dim.code;
>> > 错误信息:
>> >
>> >
>> >  The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
>> SPECIAL
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> > at
>> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> > at
>> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> > at
>> >
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> > at
>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> > Caused by: java.lang.UnsupportedOperationException: class
>> > org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> > at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>> > at
>> org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
>> > at
>> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
>> > at
>> > org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> > at
>> >
>> org.apache.calcite.sql.SqlSnapshot$SqlSnapshotOperator.unparse(SqlSnapshot.java:128)
>> > at
>> org.apache.calcite.sql.SqlSnapshot.unparse(SqlSnapshot.java:78)
>> > at
>> > org.apache.calcite.sql.SqlAsOperator.unparse(SqlAsOperator.java:76)
>> > at
>> > org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> > at
>> > org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:229)
>>

Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-26 文章 Jingsong Li
Please send to  user-zh@flink.apache.org

Best,
Jingsong

On Thu, May 26, 2022 at 2:20 PM Jingsong Li  wrote:

> 如果flink-table-legacy包没用到,把它删掉即可修复你的问题
>
> Best,
> Jingsong
>
> On Thu, May 26, 2022 at 2:16 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
>> 你好:
>> 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误:
>>
>>  
>> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误;
>>sql如下:
>>create view if not exists dwm_ai_robot_contact_view as select
>> CALLER,CALLEE,CUST_LEVEL,PRO_ID,REASON_CODE,INTENT_NAME,GEN_CENTER,BUS_PRO_ID
>> from realtime_robot_contact table_a left join dc_dim.dim_province_code /*+
>> OPTIONS('lookup.join.cache.ttl'='12 h') */ for SYSTEM_TIME as of
>> table_a.pro_time as dim on table_a.PRO_ID = dim.code;
>> 错误信息:
>>
>>
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> Caused by: java.lang.UnsupportedOperationException: class
>> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
>> at
>> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlSnapshot$SqlSnapshotOperator.unparse(SqlSnapshot.java:128)
>> at org.apache.calcite.sql.SqlSnapshot.unparse(SqlSnapshot.java:78)
>> at
>> org.apache.calcite.sql.SqlAsOperator.unparse(SqlAsOperator.java:76)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:229)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:154)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:176)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:185)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:984)
>> at
>> org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:183)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:854)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:823)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>> 

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-18 文章 Jingsong Li
Thanks~ 非常好~

Best,
Jingsong

On Mon, May 16, 2022 at 5:24 PM 18579099...@163.com <18579099...@163.com>
wrote:

> 第一次弄,不知道这么写的对不对
>
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-27604
>
>
>
> 18579099...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2022-05-13 15:06
> 收件人: user-zh
> 主题: Re: Re: flink sql无法读取Hive映射的HBase表
> Hi, 推荐 https://www.deepl.com/translator
> 非常好用
>
> 我记得对Hive Custom Storage Handler(hbase)是有问题的
>
> Best,
> Jingsong
>
> On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@163.com>
> wrote:
>
> > 我英文能力不允许啊
> >
> >
> >
> > 18579099...@163.com
> >
> > 发件人: yuxia
> > 发送时间: 2022-05-11 15:11
> > 收件人: user-zh
> > 主题: Re: flink sql无法读取Hive映射的HBase表
> > 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY
> >   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
> > 我之后空了再debug 看看。
> >
> > 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的
> > StorageDescriptor 的 inputformat 为 null,然后 Class.forName(inputformat) 就报错
> > NPE了。
> > 应该是这块代码有点问题。
> > 如果你方便的话,可以辛苦帮忙建一个 jira~
> > https://issues.apache.org/jira/projects/FLINK/summary
> >
> >
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: 18579099...@163.com
> > 收件人: "user-zh" 
> > 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
> > 主题: Re: Re: flink sql无法读取Hive映射的HBase表
> >
> > 版本:
> > flink:1.13.6
> > hive:2.1.1-cdh6.2.0
> > hbase:2.1.0-cdh6.2.0
> > flinksql执行工具:flink sql client
> > sql 提交模式:yarn-per-job
> >
> >
> -
> > flink lib目录下的包
> > antlr-runtime-3.5.2.jar
> > flink-csv-1.13.6.jar
> > flink-dist_2.11-1.13.6.jar
> > flink-json-1.13.6.jar
> > flink-shaded-zookeeper-3.4.14.jar
> > flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> > flink-table_2.11-1.13.6.jar
> > flink-table-blink_2.11-1.13.6.jar
> > guava-14.0.1.jar
> > hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> > hbase-client-2.1.0-cdh6.2.0.jar
> > hbase-common-2.1.0-cdh6.2.0.jar
> > hbase-protocol-2.1.0-cdh6.2.0.jar
> > hbase-server-2.1.0-cdh6.2.0.jar
> > hive-exec-2.1.1-cdh6.2.0.jar
> > hive-hbase-handler-2.1.1-cdh6.2.0.jar
> > htrace-core4-4.1.0-incubating.jar
> > log4j-1.2-api-2.17.1.jar
> > log4j-api-2.17.1.jar
> > log4j-core-2.17.1.jar
> > log4j-slf4j-impl-2.17.1.jar
> > protobuf-java-2.5.0.jar
> >
> >
> 
> > hive建表语句
> > CREATE EXTERNAL TABLE `ods.student`(
> >   `row_key` string,
> >   `name` string,
> >   `age` int,
> >   `addr` string
> > )
> > ROW FORMAT SERDE
> >   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> > STORED BY
> >   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> > WITH SERDEPROPERTIES (
> >
> >
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> > TBLPROPERTIES (
> >   'hbase.table.name'='ODS:STUDENT') ;
> > catalog:hive catalog
> > sql: select * from ods.student;
> > 我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate
> > the hadoop input format
> >
> >
> --
> > 详细的堆栈
> > org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> > execute SQL statement.
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:3

Re: 退订

2022-05-15 文章 Jingsong Li
退订请回复到 user-zh-unsubscr...@flink.apache.org

Best,
Jingsong

On Sun, May 15, 2022 at 1:04 PM cq <17691150...@163.com> wrote:

> 退订
>
>
>
> Best Regards,
>
> Jacob.Q.Cao
>
>
> TEL:17691150986


Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-13 文章 Jingsong Li
Hi, 推荐 https://www.deepl.com/translator
非常好用

我记得对Hive Custom Storage Handler(hbase)是有问题的

Best,
Jingsong

On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@163.com>
wrote:

> 我英文能力不允许啊
>
>
>
> 18579099...@163.com
>
> 发件人: yuxia
> 发送时间: 2022-05-11 15:11
> 收件人: user-zh
> 主题: Re: flink sql无法读取Hive映射的HBase表
> 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
> 我之后空了再debug 看看。
>
> 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的
> StorageDescriptor 的 inputformat 为 null,然后 Class.forName(inputformat) 就报错
> NPE了。
> 应该是这块代码有点问题。
> 如果你方便的话,可以辛苦帮忙建一个 jira~
> https://issues.apache.org/jira/projects/FLINK/summary
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: 18579099...@163.com
> 收件人: "user-zh" 
> 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
> 主题: Re: Re: flink sql无法读取Hive映射的HBase表
>
> 版本:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql执行工具:flink sql client
> sql 提交模式:yarn-per-job
>
> -
> flink lib目录下的包
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
>
> 
> hive建表语句
> CREATE EXTERNAL TABLE `ods.student`(
>   `row_key` string,
>   `name` string,
>   `age` int,
>   `addr` string
> )
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES (
>
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT') ;
> catalog:hive catalog
> sql: select * from ods.student;
> 我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop input format
>
> --
> 详细的堆栈
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate the hadoop input format
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
> ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
> at
> 

Re: 对Flink Table Store咨询

2022-04-28 文章 Jingsong Li
哈喽,感谢你的关注

Tison是对的,Table Store是一个库。我们目前并没有发布maven依赖。

目前你可以有两种方式来调试:

1.在Table Store的工程里调试
2.在拿到flink-table-store-dist下的 pre bundled jar,放到你工程的classpath下来调试

入口类的话是通过SQL的方式:

TableEnvironment.executeSql("CREATE TABLE XX (...)");

当classpath下包含 table store 的 jar 时,会服务发现到 TableStore 的 factory,进而走到
table store 的代码。

你的需求是合理的,我们后续会考虑提供官方的maven依赖支持,且提供DataStream API

Best,
Jingsong

On Sun, Apr 24, 2022 at 10:32 AM tison  wrote:
>
> Flink Table Store 不是应用,而是库。我理解是要配合 Flink
> 来使用的,断点调试的话,看你的需求,如果只是对一段代码有疑问,跑测试打断点就行了。
>
> Best,
> tison.
>
>
> 陈卓宇 <2572805...@qq.com.invalid> 于2022年4月24日周日 09:59写道:
>
> > 您好大佬:
> >  我对Flink Table
> > Store非常感兴趣,想请教您一下怎么结合flink做断点调试,因为看了一下没有找到入口类
> >
> > 陈卓宇
> >
> >
> > 


Re: 回撤流优化

2021-12-16 文章 Jingsong Li
理论上mini-batch就可以优化回撤流。

目前是join没有支持mini-batch。

On Thu, Dec 16, 2021 at 5:12 PM casel.chen  wrote:
>
> 看了《Oceanus的实时流式计算实践与优化》https://jishuin.proginn.com/p/763bfbd5acbf 
> 想问一下社区是否有意实现这里说的回撤流优化功能呢?
> 实际业务很多数据是从mysql binlog cdc接入的,在回撤流上做计算是常见的场景,能否在flink sql中支持这些优化呢?



-- 
Best, Jingsong Lee


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 文章 Jingsong Li
Hi Yingjie,

Thanks for your explanation. I have no more questions. +1

On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao  wrote:
>
> Hi Jingsong,
>
> Thanks for your feedback.
>
> >>> My question is, what is the maximum parallelism a job can have with the 
> >>> default configuration? (Does this break out of the box)
>
> Yes, you are right, these two options are related to network memory and 
> framework off-heap memory. Generally, these changes will not break out of the 
> box experience, but for some extreme cases, for example, there are too many 
> ResultPartitions per task, users may need to increase network memory to avoid 
> "insufficient network buffer" error. For framework off-head, I believe that 
> user do not need to change the default value.
>
> In fact, I have a basic goal when changing these config values: when running 
> TPCDS of medium parallelism with the default value, all queries must pass 
> without any error and at the same time, the performance can be improved. I 
> think if we achieve this goal, most common use cases can be covered.
>
> Currently, for the default configuration, the exclusive buffers required at 
> input gate side is still parallelism relevant (though since 1.14, we can 
> decouple the network buffer consumption from parallelism by setting a config 
> value, it has slight performance influence on streaming jobs), which means 
> that no large parallelism can be supported by the default configuration. 
> Roughly, I would say the default value can support jobs of several hundreds 
> of parallelism.
>
> >>> I do feel that this correspondence is a bit difficult to control at the 
> >>> moment, and it would be best if a rough table could be provided.
>
> I think this is a good suggestion, we can provide those suggestions in the 
> document.
>
> Best,
> Yingjie
>
> Jingsong Li  于2021年12月14日周二 14:39写道:
>>
>> Hi  Yingjie,
>>
>> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> of batch jobs.
>>
>> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> network memory and framework.off-heap.size.
>>
>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>>
>> How much network memory and framework.off-heap.size are required for
>> how much parallelism in the default configuration?
>>
>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>> >
>> > Hi Jiangang,
>> >
>> > Thanks for your suggestion.
>> >
>> > >>> The config can affect the memory usage. Will the related memory 
>> > >>> configs be changed?
>> >
>> > I think we will not change the default network memory settings. My best 
>> > expectation is that the default value can work for most cases (though may 
>> > not the best) and for other cases, user may need to tune the memory 
>> > settings.
>> >
>> > >>> Can you share the tpcds results for different configs? Although we 
>> > >>> change the default values, it is helpful to change them for different 
>> > >>> users. In this case, the experience can help a lot.
>> >
>> > I did not keep all previous TPCDS results, but from the results, I can 
>> > tell that on HDD, always using the sort-shuffle is a good choice. For 
>> > small jobs, using sort-shuffle may not bring much performance gain, this 
>> > may because that all shuffle data can be cached in memory (page cache), 
>> > this is the case if the cluster have enough resources. However, if the 
>> > whole cluster is under heavy burden or you are running large scale jobs, 
>> > the performance of those small jobs can also be influenced. For 
>> > large-scale jobs, the configurations suggested to be tuned are 
>> > taskmanager.network.sort-shuffle.min-buffers and 
>> > taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
>> > these values for large-scale batch jobs.
>> >
>> > BTW, I am still running TPCDS tests these days and I can share these 
>> > results soon.
>> >
>> > Best,
>> > Yingjie
>> >
>> > 刘建刚  于2021年12月10日周五 18:30写道:
>> >>
>> >> Glad to 

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 文章 Jingsong Li
Hi  Yingjie,

+1 for this FLIP. I'm pretty sure this will greatly improve the ease
of batch jobs.

Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
and "taskmanager.network.sort-shuffle.min-buffers" are related to
network memory and framework.off-heap.size.

My question is, what is the maximum parallelism a job can have with
the default configuration? (Does this break out of the box)

How much network memory and framework.off-heap.size are required for
how much parallelism in the default configuration?

I do feel that this correspondence is a bit difficult to control at
the moment, and it would be best if a rough table could be provided.

Best,
Jingsong

On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>
> Hi Jiangang,
>
> Thanks for your suggestion.
>
> >>> The config can affect the memory usage. Will the related memory configs 
> >>> be changed?
>
> I think we will not change the default network memory settings. My best 
> expectation is that the default value can work for most cases (though may not 
> the best) and for other cases, user may need to tune the memory settings.
>
> >>> Can you share the tpcds results for different configs? Although we change 
> >>> the default values, it is helpful to change them for different users. In 
> >>> this case, the experience can help a lot.
>
> I did not keep all previous TPCDS results, but from the results, I can tell 
> that on HDD, always using the sort-shuffle is a good choice. For small jobs, 
> using sort-shuffle may not bring much performance gain, this may because that 
> all shuffle data can be cached in memory (page cache), this is the case if 
> the cluster have enough resources. However, if the whole cluster is under 
> heavy burden or you are running large scale jobs, the performance of those 
> small jobs can also be influenced. For large-scale jobs, the configurations 
> suggested to be tuned are taskmanager.network.sort-shuffle.min-buffers and 
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
> these values for large-scale batch jobs.
>
> BTW, I am still running TPCDS tests these days and I can share these results 
> soon.
>
> Best,
> Yingjie
>
> 刘建刚  于2021年12月10日周五 18:30写道:
>>
>> Glad to see the suggestion. In our test, we found that small jobs with the 
>> changing configs can not improve the performance much just as your test. I 
>> have some suggestions:
>>
>> The config can affect the memory usage. Will the related memory configs be 
>> changed?
>> Can you share the tpcds results for different configs? Although we change 
>> the default values, it is helpful to change them for different users. In 
>> this case, the experience can help a lot.
>>
>> Best,
>> Liu Jiangang
>>
>> Yun Gao  于2021年12月10日周五 17:20写道:
>>>
>>> Hi Yingjie,
>>>
>>> Very thanks for drafting the FLIP and initiating the discussion!
>>>
>>> May I have a double confirmation for 
>>> taskmanager.network.sort-shuffle.min-parallelism that
>>> since other frameworks like Spark have used sort-based shuffle for all the 
>>> cases, does our
>>> current circumstance still have difference with them?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>>
>>> --
>>> From:Yingjie Cao 
>>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>> To:dev ; user ; user-zh 
>>> 
>>> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>>>
>>> Hi dev & users:
>>>
>>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>>
>>> Best,
>>> Yingjie
>>>
>>> [1] 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>>
>>> Hi dev & users,
>>>
>>> We propose to change some default values of blocking shuffle to improve the 
>>> user out-of-box experience (not influence streaming). The default values we 
>>> want to change are as follows:
>>>
>>> 1. Data compression 
>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the 
>>> default value is 'false'.  Usually, data compression can reduce both disk 
>>> and network IO which is good for performance. At the same time, it can save 
>>> storage space. We propose to change the default value to true.
>>>
>>> 2. Default shuffle implementation 
>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default 
>>> value is 'Integer.MAX', which means by default, Flink jobs will always use 
>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for 
>>> both stability and performance. So we propose to reduce the default value 
>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and 
>>> 1024 with a tpc-ds and 128 is the best one.)
>>>
>>> 3. Read buffer of sort-shuffle 
>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the 
>>> default value is '32M'. Previously, when choosing the default value, both 
>>> ‘32M' and '64M' 

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 文章 Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


Re: Flinksql 多表进行full join 出现异常

2021-11-11 文章 Jingsong Li
Hi,

不好意思,不会cherry-pick到1.12了,因为这是个feature,在1.14及其之后的版本支持

Best,
Jingsong

On Fri, Nov 12, 2021 at 3:06 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:
>
> 社区您好 我通过代码debug已经定位到问题:
>
>
> 在flink1.12.5版本下flink-orc_2.11模块下的org/apache/flink/orc/vector/AbstractOrcColumnVector.java文件
> 下createFlinkVector中是没有对ListColumnVector进行实现的,我到flink的master上看在2021/5/12由wangwei1025提交的pr进行了实现,现在想请问社区有没有打算对1.12.5版本的此次问题根据wangwei1025的提交进行补丁的修复
>
>
> 表字段:
>
>
>
>
>string_tag   string
>
>
>
>
>number_tag   number
>
>
>
>
>boolean_tag   boolean
>
>
>
>
>datetime_tag   datetime
>
>
>
>
>arr_tag   array
> 字段这里我进行了转换,生成这个SQL ,我发现具有array SQL:CREATE TABLE smarttag_base_table_5 (
>  distinct_id BIGINT,
>  xwho VARCHAR,
> string_tag string,
> number_tag decimal,
> boolean_tag integer,
> datetime_tag bigint,
> arr_tag ARRAY ds INTEGER
> ) WITH (
>  'connector' = 
> 'filesystem',  -- 必选: 
> 指定连接器类型
>  'path' = 
> 'hdfs://ark1:8020/tmp/usertag/20211029/db_31abd9593e9983ec/orcfile/smarttag_base_table_5/',
>  -- 必选: 指向目录的路径
>  'format' = 
> 'orc'
>   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节
> )
>
>
>
>
>
> 报错:Unsupport vector: org.apache.hadoop.hive.ql.exec.vector.ListColumnVector
> 我看是因为array source到hdfs的一张orc的表
>
> 陈卓宇
>
>
> 
>
>
>
>
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年11月12日(星期五) 上午10:59
> 收件人:"flink中文邮件组"
> 主题:Re: Flinksql 多表进行full join 出现异常
>
>
>
> Hi!
>
> 感谢反馈问题。这看起来其实和 join 无关,应该是与 source 有关。方便的话,能否把 source 表的
> ddl(包含每个字段的类型,字段名如果敏感可以重命名一下)和其他信息(例如 source 表以什么格式存储)分享在邮件里?
>
> 陈卓宇 <2572805...@qq.com.invalid 于2021年11月11日周四 下午9:44写道:
>
>  场景:进行多表的full join失败
> 
> 
>  报错:
>  java.lang.RuntimeException: Failed to fetch next result
> 
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
>  nbsp;nbsp; nbsp;at 
> TableAPI.envReadFileSysteam(TableAPI.java:441)
>  nbsp;nbsp; nbsp;at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>  nbsp;nbsp; nbsp;at
>  
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  nbsp;nbsp; nbsp;at
>  
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  nbsp;nbsp; nbsp;at 
> java.lang.reflect.Method.invoke(Method.java:498)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  nbsp;nbsp; nbsp;at
>  org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>  nbsp;nbsp; nbsp;at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  

Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-10 文章 Jingsong Li
Thanks!

+1 to pattern

Best,
Jingsong

On Wed, Nov 10, 2021 at 7:52 PM yidan zhao  wrote:
>
> 我在jira回复了下,我感觉还是能配置化好一些,那个liwei貌似现在加了个basicDate这个太单一了。
>
> Jingsong Li  于2021年11月4日周四 下午12:18写道:
>
> > 你可以自定义个partition.time-extractor.class来自己解析
> >
> > Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。
> > 建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758
> >
> > Best,
> > Jingsong
> >
> > On Thu, Nov 4, 2021 at 11:47 AM yidan zhao  wrote:
> > >
> > > 如题,我当前是select date_format(xxx, 'MMdd') as dt...
> > >
> > > partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。
> > >
> > > 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >



-- 
Best, Jingsong Lee


Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-03 文章 Jingsong Li
你可以自定义个partition.time-extractor.class来自己解析

Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。
建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758

Best,
Jingsong

On Thu, Nov 4, 2021 at 11:47 AM yidan zhao  wrote:
>
> 如题,我当前是select date_format(xxx, 'MMdd') as dt...
>
> partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。
>
> 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。



-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-10 文章 Jingsong Li
Thanks Yun Tang and everyone!

Best,
Jingsong

On Tue, Aug 10, 2021 at 9:37 AM Xintong Song  wrote:

> Thanks Yun and everyone~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann 
> wrote:
>
> > Thanks Yun Tang for being our release manager and the great work! Also
> > thanks a lot to everyone who contributed to this release.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
> >
> >> Thanks Yun Tang for being our release manager and everyone else who made
> >> the release possible!
> >>
> >> Best Regards,
> >> Yu
> >>
> >>
> >> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
> >>
> >>>
> >>> The Apache Flink community is very happy to announce the release of
> >>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
> >>> Flink 1.13 series.
> >>>
> >>> Apache Flink® is an open-source stream processing framework for
> >>> distributed, high-performing, always-available, and accurate data
> streaming
> >>> applications.
> >>>
> >>> The release is available for download at:
> >>> https://flink.apache.org/downloads.html
> >>>
> >>> Please check out the release blog post for an overview of the
> >>> improvements for this bugfix release:
> >>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
> >>>
> >>> The full release notes are available in Jira:
> >>>
> >>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
> >>>
> >>> We would like to thank all contributors of the Apache Flink community
> >>> who made this release possible!
> >>>
> >>> Regards,
> >>> Yun Tang
> >>>
> >>
>


-- 
Best, Jingsong Lee


Re: Re: filesystem connector不支持跨subtask合并小文件

2021-08-05 文章 Jingsong Li
这个参数的意思是合并后的文件最大的size,你每个文件1k多,两个文件就大于2k了,所以没有触发合并

On Fri, Aug 6, 2021 at 11:59 AM Rui Li  wrote:

> 可以把这个参数调大点试试呢,调成远大于单个文件的size
>
> On Thu, Aug 5, 2021 at 1:43 PM lixin58...@163.com 
> wrote:
>
> > 你好,
> > 生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大
> >
> > --
> > lixin58...@163.com
> >
> >
> > *发件人:* Rui Li 
> > *发送时间:* 2021-08-05 11:42
> > *收件人:* user-zh 
> > *主题:* Re: filesystem connector不支持跨subtask合并小文件
> > 你好,
> >
> > 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么
> >
> > On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com 
> > wrote:
> >
> > > 你好,
> > > 在使用filesystem
> > >
> >
> connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?
> > >
> > > create table fs_parquet_compact
> > > (userid bigint, name string, part string)
> > > PARTITIONED BY (part)
> > > with(
> > > 'connector' = 'filesystem',
> > > 'path' = 'hdfs:///data/fs_parquet_compact',
> > > 'format' = 'parquet',
> > > 'auto-compaction' = 'true',
> > > 'compaction.file-size' = '2kb',
> > > 'sink.rolling-policy.file-size' = '500b',
> > > 'sink.rolling-policy.rollover-interval' = '800s',
> > > 'sink.rolling-policy.check-interval' = '60s'
> > > );
> > >
> > >
> > >
> > > lixin58...@163.com
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
> >
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: 1.14啥时候出呀

2021-08-04 文章 Jingsong Li
1.14还有1-2个月
1.13.2马上就出了,估计明天或后天或周一

On Wed, Aug 4, 2021 at 4:48 PM yidan zhao  wrote:

> 如题,1.14或1.13.2啥时候出呀,有人知道吗。
>


-- 
Best, Jingsong Lee


Re: Re: filesystem table parquet 滚动问题

2021-07-26 文章 Jingsong Li
是的,类似的

On Tue, Jul 27, 2021 at 10:42 AM lixin58...@163.com 
wrote:

> 你好,
>
> 感谢回复,看了下这个文档,提到对于parquet这种列式文件只会使用onCheckpointRollPolicy,也就是只在做检查点时会滚动。flink
> filesystem table这块的parquet列式文件写入是不是也这样呢?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/streamfile_sink.html
>
>
>
>
> lixin58...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2021-07-27 10:30
> 收件人: user-zh
> 主题: Re: filesystem table parquet 滚动问题
> parquet因为它会在内存中攒buffer,所以文件的file-size并不能很精确。。只能等它flush了才会生效。
>
> On Sun, Jul 25, 2021 at 9:47 AM lixin58...@163.com 
> wrote:
>
> > 大家好,
> >
> > 检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2
> >
> >
> >
> 不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说
> >
> > 求大佬们解惑!
> >
> > create table fs_parquet
> > (userid bigint, name string, part string)
> > PARTITIONED BY (part)
> > with(
> > 'connector' = 'filesystem',
> > 'path' = 'hdfs:///data/fs_parquet',
> > 'format' = 'parquet',
> > 'sink.rolling-policy.file-size' = '1kb',
> > 'sink.rolling-policy.rollover-interval' = '800s',
> > 'sink.rolling-policy.check-interval' = '60s'
> > );
> >
> >
> >
> >
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: filesystem table parquet 滚动问题

2021-07-26 文章 Jingsong Li
parquet因为它会在内存中攒buffer,所以文件的file-size并不能很精确。。只能等它flush了才会生效。

On Sun, Jul 25, 2021 at 9:47 AM lixin58...@163.com 
wrote:

> 大家好,
>
> 检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2
>
>
> 不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说
>
> 求大佬们解惑!
>
> create table fs_parquet
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet',
> 'format' = 'parquet',
> 'sink.rolling-policy.file-size' = '1kb',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
>

-- 
Best, Jingsong Lee


Re: 参与开发社区

2021-07-16 文章 Jingsong Li
Flink-Hudi版本是啥?
Flink集群版本是啥?
精确到第三位版本号

On Thu, Jul 15, 2021 at 11:39 PM Page  wrote:

> 能不能把依赖和相关代码贴一下
>
>
> | |
> Page
> |
> |
> lj879933...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年7月13日 18:36,沉黙dē羔羊<736036...@qq.com.INVALID> 写道:
> 大家好,请教下,用了flink hudi 写入数据报错,如下:
> Caused by: org.apache.flink.util.FlinkException: Error from
> OperatorCoordinator
> ... 41 more
> Caused by: java.lang.AbstractMethodError: Method
> org/apache/hudi/sink/StreamWriteOperatorCoordinator.subtaskReady(ILorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$SubtaskGateway;)V
> is abstract
> at
> org.apache.hudi.sink.StreamWriteOperatorCoordinator.subtaskReady(StreamWriteOperatorCoordinator.java)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.notifySubtaskReady(OperatorCoordinatorHolder.java:416)
> ... 40 more



-- 
Best, Jingsong Lee


Re: flink大窗口性能问题

2021-07-14 文章 Jingsong Li
没用rocksdb吗?

On Thu, Jul 15, 2021 at 10:46 AM Michael Ran  wrote:

> 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少
> 在 2021-07-15 10:23:25,"Hui Wang" <463329...@qq.com.INVALID> 写道:
> >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优
>


-- 
Best, Jingsong Lee


Re: 退订

2021-06-28 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Mon, Jun 28, 2021 at 5:56 PM luoye <13033709...@163.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Jingsong Li
Hi,

你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题

Best,
Jingsong

On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:

>
>
> Hi, All.
>
>
> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>
>
> 版本: 1.13.1
> 运行模式: IDE-application
> ---
> about udf define...
>
>
> public static class UDFAggregateFunction extends
> AggregateFunction {
>
>
> //返回最终结果
> @Override
> public Double getValue(AccumulatorBean acc) {
> return acc.totalPrice / acc.totalNum;
> }
>
>
> //构建保存中间结果的对象
> @Override
> public AccumulatorBean createAccumulator() {
> return new AccumulatorBean();
> }
>
>
> //减去要撤回的值
> public void retract(AccumulatorBean acc, double price, long num) {
> acc.totalPrice -= price * num;
> acc.totalNum -= num;
> }
>
>
> //从每个分区把数据取出来然后合并
> public void merge(AccumulatorBean acc, Iterable
> it) {
>
>
> Iterator iter = it.iterator();
> while (iter.hasNext()) {
> AccumulatorBean a = iter.next();
> this.accumulate(acc, a.totalPrice, a.totalNum);
> }
> }
>
>
> //重置内存中值时调用
> public void resetAccumulator(AccumulatorBean acc) {
> acc.totalNum = 0;
> acc.totalPrice = 0;
> }
>
>
> //和传入数据进行计算的逻辑
> public void accumulate(AccumulatorBean acc, double price, long
> num) {
> acc.totalPrice += price * num;
> acc.totalNum += num;
> }
> }
>
>
>
> 
> About main calling
> //TODO 流批一体的 Table API
> TableEnvironment tableEnvironment =
> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
> List dataList = new ArrayList<>();
> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
> Table table = tableEnvironment.fromValues(DataTypes.ROW(
> DataTypes.FIELD("user", DataTypes.STRING()),
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("price", DataTypes.DOUBLE()),
> DataTypes.FIELD("num", DataTypes.BIGINT())
> ),
> dataList);
> tableEnvironment.createTemporaryView("orders", table);
>
>
> tableEnvironment.createTemporaryFunction("c_agg", new
> UDFAggregateFunction());
>
>
> tableEnvironment.executeSql("select user, c_agg(price, num) as
> udf_field from orders group by user").print();
>
>
>
>
>
>
>
> 异常堆栈-
>
>
>
>
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at
> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> function call:
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> 

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Jingsong Li
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

-- 
Best, Jingsong Lee


Re: flink-1.13.1 sql error

2021-06-20 文章 Jingsong Li
你这个报错信息,意思是有个字段,你在DDL里声明是ROW,但是真实数据的这列却不是一个ROW。

你应该排查下是否声明对了,如果在期望中,可以通过ignoreParseErrors来跳过这行

Best,
Jingsong

On Mon, Jun 21, 2021 at 11:46 AM zhuxiaoshang 
wrote:

> Json反序列化失败了,像是数据问题
> {\n  \"app_time\": \"2021-06-14 10:00:00\",\n 
> \"category_id\": 1,\n  \"item_id\": 1,\n 
> \"user_id\": 1,\n  \"behavior\": \"pv\"\n}
>
> > 2021年6月20日 下午9:08,kcz <573693...@qq.com.INVALID> 写道:
> >
> > 大佬们 帮看下这个是为什么提示那个错误
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 
Best, Jingsong Lee


Re: 退订

2021-06-17 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:16 PM 金晓龙  wrote:

> 退订



-- 
Best, Jingsong Lee


Re: 退訂

2021-06-17 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau 
wrote:

> 郵箱更換,退訂
>
> Regards,
> Hau ChongAih
>


-- 
Best, Jingsong Lee


Re: 邮件退订

2021-06-17 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:29 AM wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>

-- 
Best, Jingsong Lee


Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 文章 Jingsong Li
不能,除非你自己创建一个新的kafka connector。

不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。

你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:35 PM Michael Ran  wrote:

> dear all :
> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
> 但是根据 “implements DeserializationFormatFactory,
> SerializationFormatFactory”
> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>   有方法
> deserialize(ConsumerRecord record,
> Collector collector) 。
> 包装了offset 的对象:ConsumerRecord
>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>



-- 
Best, Jingsong Lee


Re: 退订

2021-06-15 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 5:05 PM 1049961436 <1049961...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: hbase async lookup能否保证输出结果有序?

2021-06-15 文章 Jingsong Li
是有序的。

无序的mode目前并没有支持, 目前可能会影响流计算的正确性

Best,
Jingsong

On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:

> hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
>


-- 
Best, Jingsong Lee


Re: 退订

2021-06-14 文章 Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 12:28 AM 张保淇  wrote:

> 退订



-- 
Best, Jingsong Lee


Re: (无主题)

2021-06-14 文章 Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 12:32 PM 1049961436 <1049961...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: 退订

2021-06-14 文章 Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Mon, Jun 14, 2021 at 7:51 PM 周超 <769699...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: flink sql支持Common Table Expression (CTE)吗?

2021-05-23 文章 Jingsong Li
支持。

如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。

在某些优化路径上不同,一般没有实质影响。

Best,
Jingsong

On Fri, May 21, 2021 at 11:32 PM casel.chen  wrote:

> flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view
> xxx 来实现?CTE和temporary view的区别是什么?
> 例如
>
>
> with toronto_ppl as (
> SELECT DISTINCT name
> FROM population
> WHERE country = "Canada"
>   AND city = "Toronto"
> )
>, avg_female_salary as (
> SELECT AVG(salary) as avgSalary
> FROM salaries
> WHERE gender = "Female"
> )
> SELECT name
>  , salary
> FROM People
> WHERE name in (SELECT DISTINCT FROM toronto_ppl)
>   AND salary >= (SELECT avgSalary FROM avg_female_salary)



-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-12 文章 Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值)

On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:

> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> > 2020年11月12日 下午3:22,Jingsong Li  写道:
> >
> > Hi admin,
> >
> > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> >
> > On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> >
> >> 补充一下不用partition time trigger的原因,partition
> >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
> >>
> >>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >>>
> >>> Hi ,kandy
> >>> 我没有基于partition time 提交分区,我是基于默认的process
> >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >>>
> >>>> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>>>
> >>>> hi:
> >>>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>>>
> >>>>
> >>>> https://cloud.tencent.com/developer/article/1707182
> >>>>
> >>>> 这个连接可以看一下
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>>>> Hi,all
> >>>>> Flink 1.11的filesystem connector,partition
> trigger[1]都是使用的默认值,所以分区可以多次提交
> >>>>> 现在有这样的场景:
> >>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>>>> 有大佬知道吗,有实际验证过吗
> >>>>> 感谢
> >>>>>
> >>>>> 附上简单sql:
> >>>>> CREATE TABLE kafka (
> >>>>> a STRING,
> >>>>> b STRING,
> >>>>> c BIGINT,
> >>>>> process_time BIGINT,
> >>>>> e STRING,
> >>>>> f STRING,
> >>>>> g STRING,
> >>>>> h INT,
> >>>>> i STRING
> >>>>> ) WITH (
> >>>>> 'connector' = 'kafka',
> >>>>> 'topic' = 'topic',
> >>>>> 'properties.bootstrap.servers' = 'x',
> >>>>> 'properties.group.id' = 'test-1',
> >>>>> 'scan.startup.mode' = 'latest-offset',
> >>>>> 'format' = 'json',
> >>>>> 'properties.flink.partition-discovery.interval-millis' = '30'
> >>>>> );
> >>>>>
> >>>>> CREATE TABLE filesystem (
> >>>>> `day` STRING,
> >>>>> `hour` STRING,
> >>>>> a STRING,
> >>>>> b STRING,
> >>>>> c BIGINT,
> >>>>> d BIGINT,
> >>>>> e STRING,
> >>>>> f STRING,
> >>>>> g STRING,
> >>>>> h INT,
> >>>>> i STRING
> >>>>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>>> 'connector' = 'filesystem',
> >>>>> 'format' = 'parquet',
> >>>>> 'path' = 'hdfs://xx',
> >>>>> 'parquet.compression'='SNAPPY',
> >>>>> 'sink.partition-commit.policy.kind' = 'success-file'
> >>>>> );
> >>>>>
> >>>>> insert into filesystem
> >>>>> select
> >>>>> from_unixtime(process_time,'-MM-dd') as `day`,
> >>>>> from_unixtime(process_time,'HH') as `hour`,
> >>>>> a,
> >>>>> b,
> >>>>> c,
> >>>>> d,
> >>>>> e,
> >>>>> f,
> >>>>> g,
> >>>>> h,
> >>>>> i
> >>>>> from kafka;
> >>>>>
> >>>>>
> >>>>>
> >>>>> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >>>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-11 文章 Jingsong Li
Hi admin,

不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)

On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:

> 补充一下不用partition time trigger的原因,partition
> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>
> > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >
> > Hi ,kandy
> > 我没有基于partition time 提交分区,我是基于默认的process
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >
> >> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>
> >> hi:
> >> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>
> >>
> >> https://cloud.tencent.com/developer/article/1707182
> >>
> >> 这个连接可以看一下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>> Hi,all
> >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> >>> 现在有这样的场景:
> >>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>> 有大佬知道吗,有实际验证过吗
> >>> 感谢
> >>>
> >>> 附上简单sql:
> >>> CREATE TABLE kafka (
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  process_time BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) WITH (
> >>>  'connector' = 'kafka',
> >>>  'topic' = 'topic',
> >>>  'properties.bootstrap.servers' = 'x',
> >>>  'properties.group.id' = 'test-1',
> >>>  'scan.startup.mode' = 'latest-offset',
> >>>  'format' = 'json',
> >>>  'properties.flink.partition-discovery.interval-millis' = '30'
> >>> );
> >>>
> >>> CREATE TABLE filesystem (
> >>>  `day` STRING,
> >>>  `hour` STRING,
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  d BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>  'connector' = 'filesystem',
> >>>  'format' = 'parquet',
> >>>  'path' = 'hdfs://xx',
> >>>  'parquet.compression'='SNAPPY',
> >>>  'sink.partition-commit.policy.kind' = 'success-file'
> >>> );
> >>>
> >>> insert into filesystem
> >>> select
> >>>  from_unixtime(process_time,'-MM-dd') as `day`,
> >>>  from_unixtime(process_time,'HH') as `hour`,
> >>>  a,
> >>>  b,
> >>>  c,
> >>>  d,
> >>>  e,
> >>>  f,
> >>>  g,
> >>>  h,
> >>>  i
> >>> from kafka;
> >>>
> >>>
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >
>
>

-- 
Best, Jingsong Lee


Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 Jingsong Li
- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的

On Mon, Nov 2, 2020 at 10:38 AM Rui Li  wrote:

> Hi,
>
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
>
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
>
> LOG.info("Committed partition {} to metastore", partitionSpec);
>
> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
>
>
> On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
>
> > 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> > streaming不能自动注册hive分区吗?还是我使用的姿势不对?
> >
> > 陈帅  于2020年11月1日周日 下午5:24写道:
> >
> > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > > ") STORED AS TEXTFILE TBLPROPERTIES ("
> > >
> > > 这是生成的hive表建表语句
> > >
> > > hive> show create table team;
> > > OK
> > > CREATE TABLE `team`(
> > >   `team_id` int,
> > >   `team_name` string,
> > >   `create_time` string,
> > >   `update_time` string,
> > >   `op` string)
> > > PARTITIONED BY (
> > >   `dt` string,
> > >   `hr` string,
> > >   `mi` string)
> > > ROW FORMAT SERDE
> > >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > > STORED AS INPUTFORMAT
> > >   'org.apache.hadoop.mapred.TextInputFormat'
> > > OUTPUTFORMAT
> > >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > > LOCATION
> > >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > > TBLPROPERTIES (
> > >   'is_generic'='false',
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> > >   'sink.partition-commit.delay'='1 min',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'transient_lastDdlTime'='160466')
> > > Time taken: 0.252 seconds, Fetched: 25 row(s)
> > >
> > > 另外,下载了hive文件内容如下
> > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> > 11:25:38<0x01>INSERT
> > >
> > > 还是查询不到结果
> > > hive> select * from team;
> > > OK
> > > Time taken: 0.326 seconds
> > >
> > > 陈帅  于2020年11月1日周日 下午5:10写道:
> > >
> > >>
> > >>
> >
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> > >> 生成的hive分区文件路径类似于
> > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> > >>
> > >> 陈帅  于2020年11月1日周日 下午4:43写道:
> > >>
> > >>>
> >
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> > >>> shell查不到数据。
> > >>>
> > >>> import com.alibaba.fastjson.JSON;
> > >>> import com.alibaba.fastjson.JSONObject;
> > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>> import org.apache.flink.api.common.typeinfo.Types;
> > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >>> import org.apache.flink.streaming.api.CheckpointingMode;
> > >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >>> import org.apache.flink.streaming.api.datastream.DataStream;
> > >>> import
> > >>>
> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> > >>> import
> > >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >>> import
> > >>>
> >
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> > >>> import org.apache.flink.streaming.api.windowing.time.Time;
> > >>> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> > >>> import
> > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> > >>> import org.apache.flink.table.api.EnvironmentSettings;
> > >>> import org.apache.flink.table.api.SqlDialect;
> > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> > >>> import org.apache.flink.types.Row;
> > >>> import org.apache.flink.types.RowKind;
> > >>>
> > >>> import java.time.Duration;
> > >>> import java.time.Instant;
> > >>> import java.time.LocalDateTime;
> > >>> import java.time.ZoneId;
> > >>> import java.time.format.DateTimeFormatter;
> > >>> import java.util.Properties;
> > >>>
> > >>> public class MysqlCDC2Hive {
> > >>>
> > >>> private static final DateTimeFormatter dtf =
> > >>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
> > >>>
> > >>> public static void main(String[] args) throws Exception {
> > >>> StreamExecutionEnvironment streamEnv =
> > >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>>
> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >>> streamEnv.setParallelism(3);
> > >>> streamEnv.enableCheckpointing(6);
> > >>>
> > >>> EnvironmentSettings tableEnvSettings =
> > >>> EnvironmentSettings.newInstance()
> > >>> .useBlinkPlanner()
> > >>> 

Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
注意时区哦,SQL层默认使用UTC的long值

On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
>   .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
>   }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> >  object StreamMain {
> >   def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inStreamingMode()
> >   .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> >   catalogName,  // catalog name
> >   "yutest",// default database
> >
> >   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive
> config (hive-site.xml) directory
> >   "1.1.0"   // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> > //  如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> >   )
> >   partitioned by(
> >   dt string,
> >   h string,
> >   m string) stored as parquet
> >   TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> >   )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into  fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> >   }
> > }
> > public class MySource implements SourceFunction {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979

Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
>  >
>  > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> --
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee


Re: flink sql 写入hive问题

2020-10-22 文章 Jingsong Li
writer的并行度是根据上游并行度来的

committer的并行度才是1

On Thu, Oct 22, 2020 at 5:22 PM 酷酷的浑蛋  wrote:

> 我用flink sql实时写入hive表时发现sink的并行度为1?
> 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  并行度1的写入速度很慢
>
>
>
>

-- 
Best, Jingsong Lee


Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 Jingsong Li
Hi,
是Hive表吧?
https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的

可以下载最新的1.11分支的Hive依赖来试下:
https://repository.apache.org/snapshots/org/apache/flink/
(比如你用hive-1.2.2依赖,你可以下载
https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
)

Best,
Jingsong

On Fri, Oct 9, 2020 at 3:50 PM me  wrote:

> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年10月9日(周五) 15:34
> 主题: flink1.11流式写入hive速度慢的问题
>
>
> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
> chaitin_test.printSchema()
> tableEnv.executeSql("insert into chaitin_test select test from " +
> chaitin_test)



-- 
Best, Jingsong Lee


[DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 文章 Jingsong Li
Hi devs and users:

After the 1.11 release, I heard some voices recently: How can't Hive's
documents be found in the "Table & SQL Connectors".

Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
SQL Connectors" document was extracted separately, Hive is a little out of
place.
And Hive's code is also in "flink-connector-hive", which should be a
connector.
Hive also includes the concept of HiveCatalog. Is catalog a part of the
connector? I think so.

What do you think? If you don't object, I think we can move it.

Best,
Jingsong Lee


Re: Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 文章 Jingsong Li
Hi,

不好意思,麻烦试下
试下最新的release-1.11分支编译出来的Hive依赖 (flink-connector-hive的 改动)

> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?

这是1.12的目标,这两天会出来JIRA和设计方案,类似会加上"auto-compaction"的配置,sink中自动合并

Best,
Jingsong


On Fri, Sep 18, 2020 at 10:18 AM kandy.wang  wrote:

>
>
>
>
>
>
> @Jingsong Li
>   测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
> 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter
> 修改应该都提交到flink 1.11分支了吧。
> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?
>
>
> 在 2020-09-17 14:19:42,"Jingsong Li"  写道:
> >是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> >
> >> 为何要强制滚动文件
> >
> >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
> >
> >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >> ok. 就是用hadoop mr writer vs  flink 自实现的native
> >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> >> 改成false是可以满足我们的写hive需求了
> >> 还有一个问题,之前问过你,你还没回复:
> >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true
> 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> >>
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> >> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >> >可以再尝试下最新的1.11.2吗?
> >> >
> >> >https://flink.apache.org/downloads.html
> >> >
> >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >> >
> >> >> 是master分支代码
> >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> >> if (userMrWriter) {
> >> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner,
> >> >> rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> >> } else {
> >> >>Optional> bulkFactory =
> >> >> createBulkWriterFactory(partitionColumns, sd);
> >> >>if (bulkFactory.isPresent()) {
> >> >>   builder = StreamingFileSink.forBulkFormat(
> >> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> >> new
> >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> >> partComputer))
> >> >> .withBucketAssigner(assigner)
> >> >> .withRollingPolicy(rollingPolicy)
> >> >> .withOutputFileConfig(outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> >> } else {
> >> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> >> assigner, rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer
> because
> >> >> BulkWriter Factory not available.");
> >> >> }
> >> >> }
> >> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >> >是最新的代码吗?
> >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >> >
> >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >> @Jingsong Li
> >> >> >>
> >> >> >> public TableSink createTableSink(TableSinkFactory.Context
> context) {
> >> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >> >>
> >> >> >>boolean isGeneric =
> >> >> >>
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >> >>
> >> >> >>if (!isGeneric) {
> >> >> >> return new HiveTableSink(
> >> >> >> context.getConfiguration().get(
> >> >> >>
> >>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> >> context.isBounded(),
> >> >> >> new JobConf(hiveConf),
> >> >> >> context.getObjectIdentifier(),
> >> >> >> table);
> >> >> >> } else {
> >> >> >> return TableFactoryUtil.findAndC

Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 Jingsong Li
你仔细看看这两个数据源是不是有什么不同
只要有一点不同,Blink 就 reuse 不了

On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:

> 场景描述:
> 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> 问题描述:
> Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
>
> 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> - table.optimizer.reuse-source-enabled
> - table.optimizer.reuse-sub-plan-enabled
>
> 请问下,有人碰到类似问题么?
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best, Jingsong Lee


Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 Jingsong Li
是的,可以测一下,理论上 mr writer不应该有较大性能差距。

> 为何要强制滚动文件

因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。

On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:

>
>
>
> ok. 就是用hadoop mr writer vs  flink 自实现的native
> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> 改成false是可以满足我们的写hive需求了
> 还有一个问题,之前问过你,你还没回复:
> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >可以再尝试下最新的1.11.2吗?
> >
> >https://flink.apache.org/downloads.html
> >
> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >
> >> 是master分支代码
> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> if (userMrWriter) {
> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner,
> >> rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> } else {
> >>Optional> bulkFactory =
> >> createBulkWriterFactory(partitionColumns, sd);
> >>if (bulkFactory.isPresent()) {
> >>   builder = StreamingFileSink.forBulkFormat(
> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> new
> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> partComputer))
> >> .withBucketAssigner(assigner)
> >> .withRollingPolicy(rollingPolicy)
> >> .withOutputFileConfig(outputFileConfig);
> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> } else {
> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner, rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> >> BulkWriter Factory not available.");
> >> }
> >> }
> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >是最新的代码吗?
> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >
> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >> >
> >> >> @Jingsong Li
> >> >>
> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >>
> >> >>boolean isGeneric =
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >>
> >> >>if (!isGeneric) {
> >> >> return new HiveTableSink(
> >> >> context.getConfiguration().get(
> >> >>
>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> context.isBounded(),
> >> >> new JobConf(hiveConf),
> >> >> context.getObjectIdentifier(),
> >> >> table);
> >> >> } else {
> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> }
> >> >> }
> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >>
> >> >> If it is false, using flink native writer to write parquet and orc
> >> files;
> >> >>
> >> >> If it is true, using hadoop mapred record writer to write parquet and
> >> orc
> >> >> files
> >> >>
> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >>
> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> >> 一些相关的参数 ?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >> >Sink并行度
> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >
> >> >> >HDFS性能
> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >
> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
> wrote:
>

Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
可以再尝试下最新的1.11.2吗?

https://flink.apache.org/downloads.html

On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:

> 是master分支代码
> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> if (userMrWriter) {
>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
> rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> } else {
>Optional> bulkFactory =
> createBulkWriterFactory(partitionColumns, sd);
>if (bulkFactory.isPresent()) {
>   builder = StreamingFileSink.forBulkFormat(
> new org.apache.flink.core.fs.Path(sd.getLocation()),
> new
> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
> .withBucketAssigner(assigner)
> .withRollingPolicy(rollingPolicy)
> .withOutputFileConfig(outputFileConfig);
> LOG.info("Hive streaming sink: Use native parquet writer.");
> } else {
>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner, rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> BulkWriter Factory not available.");
> }
> }
> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >是最新的代码吗?
> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >它是影响性能的,1.11.2已经投票通过,即将发布
> >
> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >
> >> @Jingsong Li
> >>
> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >>CatalogTable table = checkNotNull(context.getTable());
> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >>
> >>boolean isGeneric =
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >>
> >>if (!isGeneric) {
> >> return new HiveTableSink(
> >> context.getConfiguration().get(
> >>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> context.isBounded(),
> >> new JobConf(hiveConf),
> >> context.getObjectIdentifier(),
> >> table);
> >> } else {
> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> }
> >> }
> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >>
> >> If it is false, using flink native writer to write parquet and orc
> files;
> >>
> >> If it is true, using hadoop mapred record writer to write parquet and
> orc
> >> files
> >>
> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >>
> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> 一些相关的参数 ?
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >Sink并行度
> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >
> >> >HDFS性能
> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >
> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >> >
> >> >> 场景很简单,就是kafka2hive
> >> >> --5min入仓Hive
> >> >>
> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >>
> >> >> SELECT
> >> >>
> >> >>  arg_service,
> >> >>
> >> >> time_local
> >> >>
> >> >> .
> >> >>
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >>
> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> properties.group.id
> >> '='kafka_hive_test',
> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >>
> >> >>
> >> >>
> >> >> --kafka source表定义
> >> >>
> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >>
> >> >> arg_service string COMMENT 'arg_service',
> >> >>
> >> >> 
> >> >>
> >> >> )WITH (
> >> >>
> >> >>   'connector' = 'kafka',
> >> >>
> >> >>   'topic' = '...',
> >> >>
> >> >>   'properties.bootstrap.servers' = '...',
> >> >>

Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 文章 Jingsong Li
Thanks ZhuZhu for driving the release.

Best,
Jingsong

On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


-- 
Best, Jingsong Lee


Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
是最新的代码吗?
1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
它是影响性能的,1.11.2已经投票通过,即将发布

On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:

> @Jingsong Li
>
> public TableSink createTableSink(TableSinkFactory.Context context) {
>CatalogTable table = checkNotNull(context.getTable());
> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>
>boolean isGeneric =
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>
>if (!isGeneric) {
> return new HiveTableSink(
> context.getConfiguration().get(
>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> context.isBounded(),
> new JobConf(hiveConf),
> context.getObjectIdentifier(),
> table);
> } else {
> return TableFactoryUtil.findAndCreateTableSink(context);
> }
> }
>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>
> If it is false, using flink native writer to write parquet and orc files;
>
> If it is true, using hadoop mapred record writer to write parquet and orc
> files
>
> 将此参数调整成false后,同样的资源配置下,tps达到30W
>
> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> 一些相关的参数 ?
>
>
>
>
>
> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >Sink并行度
> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >
> >HDFS性能
> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >
> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >
> >> 场景很简单,就是kafka2hive
> >> --5min入仓Hive
> >>
> >> INSERT INTO  hive.temp_.hive_5min
> >>
> >> SELECT
> >>
> >>  arg_service,
> >>
> >> time_local
> >>
> >> .
> >>
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >>
> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
> '='kafka_hive_test',
> >> 'scan.startup.mode'='earliest-offset') */;
> >>
> >>
> >>
> >> --kafka source表定义
> >>
> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >>
> >> arg_service string COMMENT 'arg_service',
> >>
> >> 
> >>
> >> )WITH (
> >>
> >>   'connector' = 'kafka',
> >>
> >>   'topic' = '...',
> >>
> >>   'properties.bootstrap.servers' = '...',
> >>
> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >>
> >>   'scan.startup.mode' = 'group-offsets',
> >>
> >>   'format' = 'json',
> >>
> >>   'json.fail-on-missing-field' = 'false',
> >>
> >>   'json.ignore-parse-errors' = 'true'
> >>
> >> );
> >> --sink hive表定义
> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> 
> >> )
> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
> >>   'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval' ='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >> );
> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> 就是flink sql可以
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> 这块,有没有什么可以提升性能相关的优化参数?
> >>
> >>
> >>
> >>
> >> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >> >Hi,
> >> >
> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >
> >> >另外,压测时是否可以看下jstack?
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >> >
> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> ,source
> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
Sink并行度
我理解是配置Sink并行度,这个一直在讨论,还没结论

HDFS性能
具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO

On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:

> 场景很简单,就是kafka2hive
> --5min入仓Hive
>
> INSERT INTO  hive.temp_.hive_5min
>
> SELECT
>
>  arg_service,
>
> time_local
>
> .
>
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>
> FROM hive.temp_.kafka_source_pageview/*+ 
> OPTIONS('properties.group.id'='kafka_hive_test',
> 'scan.startup.mode'='earliest-offset') */;
>
>
>
> --kafka source表定义
>
> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>
> arg_service string COMMENT 'arg_service',
>
> 
>
> )WITH (
>
>   'connector' = 'kafka',
>
>   'topic' = '...',
>
>   'properties.bootstrap.servers' = '...',
>
>   'properties.group.id' = 'flink_etl_kafka_hive',
>
>   'scan.startup.mode' = 'group-offsets',
>
>   'format' = 'json',
>
>   'json.fail-on-missing-field' = 'false',
>
>   'json.ignore-parse-errors' = 'true'
>
> );
> --sink hive表定义
> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> 
> )
> PARTITIONED BY (dt string , hm string) stored as orc location
> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>   'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval' ='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
> );
> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> 就是flink sql可以
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> 这块,有没有什么可以提升性能相关的优化参数?
>
>
>
>
> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >Hi,
> >
> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >
> >另外,压测时是否可以看下jstack?
> >
> >Best,
> >Jingsong
> >
> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >
> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> ,source
> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-16 文章 Jingsong Li
你指的可能是控制sink的并行度,这个一直在讨论中

On Wed, Sep 16, 2020 at 10:26 PM wangenbao <156827...@qq.com> wrote:

> 感谢回复
> 目前确实使用keyBy,能把并行度提高,分散数据到多个TaskManager中,但遇见个问题
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916221935.png>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916222005.png>
>
>
> 不知道能不能直接控制Insert语句的并行度
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
Hi,

可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?

另外,压测时是否可以看下jstack?

Best,
Jingsong

On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:

> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少



-- 
Best, Jingsong Lee


Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-15 文章 Jingsong Li
可以考虑在写之前按照hashtid keyBy下吗?

Best,
Jingsong

On Wed, Sep 16, 2020 at 9:36 AM wangenbao <156827...@qq.com> wrote:

> 求教各位大佬:
> 有遇到如下问题的吗?
>
> 1、我首先通过TableAPI读取Kafka中PB格式数据,转换成POJO对象,然后注册成View;
> 2、然后Insert into到三分区(日,小时,hashtid)的Hive表(Parquet格式Snappy压缩)中;
> 3、数据的分区相对分散些就会出现OOM问题,具体表现为
> parquet.hadoop.MemoryManager: Total allocation exceeds 50.00%
> (2,102,394,880
> bytes) of heap memory
> Scaling row group sizes to 13.62% for 115 writers
> 随后就会出现java.lang.OutOfMemoryError: Java heap space
>
> 我认为是Parquet的Writer数比较多,不知道大佬遇见过类似问题吗,该如何解决啊
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best, Jingsong Lee


Re: Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-09-08 文章 Jingsong Li
Hi kandy~

有可能是https://issues.apache.org/jira/browse/FLINK-19166
这个问题导致的,即将发布的1.11.2会Fix它,希望你可以确认重试下~

Best,
Jingsong

On Fri, Aug 14, 2020 at 7:22 PM kandy.wang  wrote:

> @Jingsong  orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景
>
> 在 2020-08-12 16:04:13,"Jingsong Li"  写道:
> >另外问一下,是什么格式?csv还是parquet。
> >有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?
> >
> >On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
> >> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
> >> 就是感觉停止之前正在写的那个分区,没有触发commit
> >>
> >>
> >>
> >>
> >> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
> >> >那你之前的分区除了in-progress文件,有已完成的文件吗?
> >> >
> >> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >> >> source就是kafka
> >> >>
> >>
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >>
> >> >>
> >> >>
> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >>
> >> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >> >> >你的source是exactly-once的source吗?
> >> >> >
> >> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >> >
> >> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> >@ Jingsong
> >> >> >>
> >> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> >> >> 用presto查询查不了
> >> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >> >> >>  'sink.partition-commit.trigger'='process-time',
> >> >> >>   'sink.partition-commit.delay'='0 min',
> >> >> >>
> >>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >> >>   'sink.rolling-policy.check-interval'='30s',
> >> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive
> add
> >> >> >> partition,就导致有数据,但是确查不 了。
> >> >> >>
> >> >>
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> >> >> partition 也能查了。
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >> >> >>
> >> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu 
> wrote:
> >> >> >> >>
> >> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >> >> >>>
> >> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
> >> wrote:
> >> >> >> >>>
> >> >> >> >>> > 1.StreamingFileWriter
> >> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >> >> >>> >
> >> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>--
> >> >> >> >>Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-08 文章 Jingsong Li
非常感谢你的反馈,应该是真的有问题,我建个JIRA追踪下

https://issues.apache.org/jira/browse/FLINK-19166

会包含在即将发布的1.11.2中

Best,
Jingsong

On Wed, Sep 9, 2020 at 10:44 AM MuChen <9329...@qq.com> wrote:

> hi,Rui Li:
> 没有提交分区的目录是commited状态,手动add partition是可以正常查询的
>
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-19/hour=07/part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1031
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> lirui.fu...@gmail.com;
> 发送时间:2020年9月8日(星期二) 晚上9:43
> 收件人:"MuChen"<9329...@qq.com;
> 抄送:"user-zh" ;
> 主题:Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
>
>
>
> 另外也list一下没有提交的分区目录吧,看看里面的文件是什么状态
>
> On Tue, Sep 8, 2020 at 9:19 PM Rui Li 
>  作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交?
> 
>  On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com wrote:
> 
>  hi, Rui Li:
>  如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志:
>  2020-09-04 17:17:10,548 INFO
> org.apache.flink.streaming.api.operators.
>  AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of
> table
>  `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
>  2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem.
>  MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
>  to metastore
>  2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem.
>  SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
>  with success file
>  2020-09-04 17:17:19,652 INFO
> org.apache.flink.streaming.api.operators.
>  AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of
> table
>  `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
>  2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem.
>  MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
>  to metastore
>  2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem.
>  SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
>  with success file
> 
>  写hdfs的日志是都有的:
>  2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
>  ParquetRecordWriterWrapper [] - creating real writer to write at
> hdfs://
> 
> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
>  08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140
>  .inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de
>  2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
>  ParquetRecordWriterWrapper [] - creating real writer to write at
> hdfs://
> 
> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
>  08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142
>  .inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd
> 
>  -- 原始邮件 --
>  *发件人:* "Rui Li"   *发送时间:* 2020年9月8日(星期二) 中午12:09
>  *收件人:* "user-zh" jkill...@dingtalk.com;
>  *抄送:* "MuChen"<9329...@qq.com;
>  *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> 
>  streaming file committer在提交分区之前会打印这样的日志:
> 
>  LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
> 
>  partition commit policy会在成功提交分区以后打印这样的日志:
> 
>  LOG.info("Committed partition {} to metastore", partitionSpec);
> 
>  LOG.info("Committed partition {} with success file",
> context.partitionSpec());
> 
>  可以检查一下这样的日志,看是不是卡在什么地方了
> 
>  On Tue, Sep 8, 2020 at 11:02 AM 夏帅  wrote:
> 
>  就第二次提供的日志看,好像是你的namenode出现的问题
> 
> 
> 
> --
>  发件人:MuChen <9329...@qq.com
>  发送时间:2020年9月8日(星期二) 10:56
>  收件人:user-zh@flink.apache.org 夏帅  user-zh <
>  user-zh@flink.apache.org
>  主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> 
>  在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
>  2020-09-04 17:17:59,520 INFO
>  org.apache.hadoop.io.retry.RetryInvocationHandler [] -
> Exception while
>  invoking create of class ClientNamenodeProtocolTranslatorPB
> over
>  uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over
> attempts.
>  Trying to fail over immediately.
>  java.io.IOException: java.lang.InterruptedException
>  at
> org.apache.hadoop.ipc.Client.call(Client.java:1449)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> org.apache.hadoop.ipc.Client.call(Client.java:1401)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
>  at
> 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>  ~[?:?]
>  at
> 
> 

Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-08 文章 Jingsong Li
插入Hive表的SQL也发下?

On Tue, Sep 8, 2020 at 9:44 PM Rui Li  wrote:

> 另外也list一下没有提交的分区目录吧,看看里面的文件是什么状态
>
> On Tue, Sep 8, 2020 at 9:19 PM Rui Li  wrote:
>
> > 作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交?
> >
> > On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com> wrote:
> >
> >> hi, Rui Li:
> >> 如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志:
> >> 2020-09-04 17:17:10,548 INFO org.apache.flink.streaming.api.operators.
> >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of table
> >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
> >> 2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem.
> >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18}
> >> to metastore
> >> 2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem.
> >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
> >> with success file
> >> 2020-09-04 17:17:19,652 INFO org.apache.flink.streaming.api.operators.
> >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of table
> >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
> >> 2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem.
> >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19}
> >> to metastore
> >> 2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem.
> >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
> >> with success file
> >>
> >> 写hdfs的日志是都有的:
> >> 2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
> >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://
> >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
> >> 08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140
> >> .inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de
> >> 2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
> >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://
> >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
> >> 08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142
> >> .inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd
> >>
> >> -- 原始邮件 --
> >> *发件人:* "Rui Li" ;
> >> *发送时间:* 2020年9月8日(星期二) 中午12:09
> >> *收件人:* "user-zh";"夏帅";
> >> *抄送:* "MuChen"<9329...@qq.com>;
> >> *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> >>
> >> streaming file committer在提交分区之前会打印这样的日志:
> >>
> >> LOG.info("Partition {} of table {} is ready to be committed", partSpec,
> tableIdentifier);
> >>
> >> partition commit policy会在成功提交分区以后打印这样的日志:
> >>
> >> LOG.info("Committed partition {} to metastore", partitionSpec);
> >>
> >> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
> >>
> >> 可以检查一下这样的日志,看是不是卡在什么地方了
> >>
> >> On Tue, Sep 8, 2020 at 11:02 AM 夏帅 
> wrote:
> >>
> >>> 就第二次提供的日志看,好像是你的namenode出现的问题
> >>>
> >>>
> >>> --
> >>> 发件人:MuChen <9329...@qq.com>
> >>> 发送时间:2020年9月8日(星期二) 10:56
> >>> 收件人:user-zh@flink.apache.org 夏帅 ; user-zh <
> >>> user-zh@flink.apache.org>
> >>> 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> >>>
> >>> 在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
> >>> 2020-09-04 17:17:59,520 INFO
> >>> org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while
> >>> invoking create of class ClientNamenodeProtocolTranslatorPB over
> >>> uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts.
> >>> Trying to fail over immediately.
> >>> java.io.IOException: java.lang.InterruptedException
> >>> at org.apache.hadoop.ipc.Client.call(Client.java:1449)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at
> >>>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
> >>> at
> >>>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> >>> ~[?:?]
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> ~[?:1.8.0_144]
> >>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
> >>> at
> >>>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at
> >>>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >>> 

Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 Jingsong Li
Hi,

flink-sql-orc_2.11-1.11.0.jar 和 flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar
目前是不能共存的,不然会冲突,你试试去掉flink-sql-orc看看?

On Tue, Sep 8, 2020 at 4:55 PM 大罗  wrote:

> Hi ,我例子中的hive orc表,不是事务表,如图:
>
> createtab_stmt
> CREATE TABLE `dest_orc`(
>   `i` int)
> PARTITIONED BY (
>   `ts` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
> LOCATION
>   'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'orc.compress'='SNAPPY',
>   'transient_lastDdlTime'='1599555226')
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 文章 Jingsong Li
Hi,

flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。

你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing

Best,
Jingsong

On Mon, Sep 7, 2020 at 2:11 PM 大罗  wrote:

> Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
> sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:
>
> 我所使用的版本如下:
>
> Hadoop 3.0.0+cdh6.3.2
>
> HDFS 3.0.0+cdh6.3.2
>
> HBase 2.1.0+cdh6.3.2
>
> Hive 2.1.1+cdh6.3.2
>
> Flink 1.11.1
>
> 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:
>
> TestStreamFileSinkViaCustomVectorizer.java
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java>
>
>
>
> 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14
>
> 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:
>
> CREATE TABLE ods_person_test_os(
> name string, age int)
> partitioned by (dt_day string, dt_hour string)
> STORED AS ORC
> LOCATION 'hdfs://nameservice1/tmp/person_orc/'
> TBLPROPERTIES(
>  'orc.compress'='SNAPPY'
> );
>
> 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
> dt_hour='16')
> ",
> 运行查询语句 "select * from ods_person_test_os"后,报错,
> hive-error.txt
> 
>
> 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。
>
> 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。
>
>
> 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.ORC_517,也就是第7个。
>
> ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
> HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
> HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
> HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
> HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
> ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
> ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
> ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
> ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
> PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
> SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
> 6),
> FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);
>
>
> 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
> ORIGINAL(0),
> HIVE_8732(1),
> HIVE_4243(2),
> HIVE_12055(3),
> HIVE_13083(4),
> FUTURE(2147483647);
>
> 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!
>
> 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
>
> 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
> =
>
> OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
> 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
> 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';
>
> 那么,为了匹配不同版本hive使用的orc writer
> version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-06 文章 Jingsong Li
另外,可能和使用本地文件系统有关?换成HDFS试试?

On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li  wrote:

> Hi,
>
> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>
> On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:
>
>> Hi, all
>>
>> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
>> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>>
>> 请问有什么好的解决方式没呢?
>>
>> Best Wishes.
>>
>> Peihui He  于2020年9月4日周五 下午6:25写道:
>>
>>> Hi, all
>>>
>>> 当指定partition的时候这个问题通过path 也没法解决了
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json', -- required: file system connector)
>>>
>>>
>>> select  * from  MyUserTable  limit 10;
>>>
>>> job 会一直卡在一个地方
>>> [image: image.png]
>>>
>>> 这种改怎么解决呢?
>>>
>>> Peihui He  于2020年9月4日周五 下午6:02写道:
>>>
>>>> hi, all
>>>> 我这边用flink sql client 创建表的时候
>>>>
>>>> CREATE TABLE MyUserTable (
>>>>   column_name1 INT,
>>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>>   'format' = 'json', -- required: file system 
>>>> connector)
>>>>
>>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>>> sql client 提交job会很慢,最后会报错
>>>>
>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>>> [Internal server error., >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>>> already been submitted. at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> End of exception on server side>] at
>>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>>> at
>>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>>
>>>>
>>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>>
>>>> 这种情况不知道有没有遇到过?
>>>>
>>>> Best Wishes.
>>>>
>>>>
>>>>
>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-06 文章 Jingsong Li
Hi,

可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?

On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:

> Hi, all
>
> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>
> 请问有什么好的解决方式没呢?
>
> Best Wishes.
>
> Peihui He  于2020年9月4日周五 下午6:25写道:
>
>> Hi, all
>>
>> 当指定partition的时候这个问题通过path 也没法解决了
>>
>> CREATE TABLE MyUserTable (
>>   column_name1 INT,
>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>   'connector' = 'filesystem',   -- required: specify the connector
>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>   'format' = 'json', -- required: file system connector)
>>
>>
>> select  * from  MyUserTable  limit 10;
>>
>> job 会一直卡在一个地方
>> [image: image.png]
>>
>> 这种改怎么解决呢?
>>
>> Peihui He  于2020年9月4日周五 下午6:02写道:
>>
>>> hi, all
>>> 我这边用flink sql client 创建表的时候
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json', -- required: file system connector)
>>>
>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>> sql client 提交job会很慢,最后会报错
>>>
>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>> [Internal server error., >> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted. at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> End of exception on server side>] at
>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>> at
>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>> at
>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>
>>>
>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>
>>> 这种情况不知道有没有遇到过?
>>>
>>> Best Wishes.
>>>
>>>
>>>
>>

-- 
Best, Jingsong Lee


Re: 使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-06 文章 Jingsong Li
失败的图没有呢。。具体什么异常?

On Mon, Sep 7, 2020 at 10:23 AM MuChen <9329...@qq.com> wrote:

> hi, all:
> 麻烦大佬们帮看个问题,多谢!
>
> 处理逻辑如下
> 1. 使用DataStream API读取kafka中的数据,写入DataStream ds1中
> 2. 新建一个tableEnv,并注册hive catalog:
> tableEnv.registerCatalog(catalogName, catalog);
> tableEnv.useCatalog(catalogName);
> 3. 声明以ds1为数据源的table
> Table sourcetable = tableEnv.fromDataStream(ds1);
> String souceTableName = "music_source";
> tableEnv.createTemporaryView(souceTableName, sourcetable);
> 4. 创建一张hive表:
>
> CREATE TABLE `dwd_music_copyright_test`(
>   `url` string COMMENT 'url',
>   `md5` string COMMENT 'md5',
>   `utime` bigint COMMENT '时间',
>   `title` string COMMENT '歌曲名',
>   `singer` string COMMENT '演唱者',
>   `company` string COMMENT '公司',
>   `level` int COMMENT '置信度.0是标题切词,1是acrcloud返回的结果,3是人工标准')
> PARTITIONED BY (
>   `dt` string,
>   `hour` string)ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS 
> INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION
>   'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test'
> TBLPROPERTIES (
>   'connector'='HiveCatalog',
>   'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.rolling-policy.check-interval'='30s',
>   'sink.rolling-policy.rollover-interval'='1min',
>   'sink.rolling-policy.file-size'='1MB');
>
>
> 5. 将step3表中的数据插入dwd_music_copyright_test
>
> 环境
>
> flink:1.11
> kafka:1.1.1
> hadoop:2.6.0
> hive:1.2.0
>
> 问题
> 程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区:
>
> show partitions rt_dwd.dwd_music_copyright_test;
>
> | dt=2020-08-29/hour=00  |
> | dt=2020-08-29/hour=01  |
> | dt=2020-08-29/hour=04  |
> | dt=2020-08-29/hour=05  |
>
>  但是hdfs目录下有文件生成:
>
> $ hadoop fs -du -h 
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/4.5 K   
> 13.4 K  
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=002.0
>  K   6.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=011.7
>  K   5.1 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=021.3
>  K   3.8 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=033.1
>  K   9.2 K   
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04
>
>
> 且手动add partition后可以正常读取数据。
>
> 通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生:
>
>
>
>
>
> 请问:
>
> 1. exactly-once只能保证写sink文件,不能保证更新catalog吗?
> 2. 是的话有什么方案解决这个问题吗?
> 3.
> EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE?
>
> streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE);
>
>

-- 
Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 文章 Jingsong Li
是的

On Fri, Aug 21, 2020 at 1:30 PM  wrote:

> flink hive表的方式是什么意思?hive streaming吗?
>
> 发自我的iPhone
>
> > 在 2020年8月21日,13:27,Jingsong Li  写道:
> >
> > Flink filesystem connector 或者 DataStream用flink-orc
> 的版本是比较新的版本,所以老版本的ORC读不了。
> >
> > 建议你用Flink hive表的方式来写orc
> >
> >> On Fri, Aug 21, 2020 at 12:25 PM  wrote:
> >>
> >> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
> >>
> >> 发自我的iPhone
> >>
> >>>> 在 2020年8月21日,12:15,Jingsong Li  写道:
> >>>
> >>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> >>> 确定这个版本hive写出的数据可以被读取吗?
> >>>
> >>>> On Fri, Aug 21, 2020 at 10:17 AM  wrote:
> >>>>
> >>>> 使用版本是flink 1.11
> >>>> Hive 2.1.1
> >>>> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
> >>>>
> >>>>
> >>>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 文章 Jingsong Li
Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。

建议你用Flink hive表的方式来写orc

On Fri, Aug 21, 2020 at 12:25 PM  wrote:

> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。
>
> 发自我的iPhone
>
> > 在 2020年8月21日,12:15,Jingsong Li  写道:
> >
> > 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
> > 确定这个版本hive写出的数据可以被读取吗?
> >
> >> On Fri, Aug 21, 2020 at 10:17 AM  wrote:
> >>
> >> 使用版本是flink 1.11
> >> Hive 2.1.1
> >> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
> >>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: flink orc与hive2.1.1版本冲突怎么解决

2020-08-20 文章 Jingsong Li
如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。
确定这个版本hive写出的数据可以被读取吗?

On Fri, Aug 21, 2020 at 10:17 AM  wrote:

> 使用版本是flink 1.11
> Hive 2.1.1
> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬?
>
>
>

-- 
Best, Jingsong Lee


Re: Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-20 文章 Jingsong Li
这是bug,已经修复了,待发布

On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote:

> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
> 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
> parquet表依然没有任何问题,而orc表任务无限重启。并报错。
>
> java.io.FileNotFoundException: File does not exist:
> hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
> at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at 

Re: flink集成到cdh

2020-08-20 文章 Jingsong Li
具体什么错呢

On Tue, Aug 18, 2020 at 8:34 PM smq <374060...@qq.com> wrote:

>
> 大家好,在网上找了个制作parcel的工具,flink1.9版本打好之后可以正常通过cm安装运行,但是1.10和1.11安装之后都是启动不了,请问大家有这方面的经验可以传授下吗,感激不尽!



-- 
Best, Jingsong Lee


Re: 【bug】flink 1.11使用连接器时schema丢失DataType指定的物理信息

2020-08-20 文章 Jingsong Li
1.11 就用新的source sink接口吧

On Wed, Aug 19, 2020 at 12:43 AM 赵 建云  wrote:

> 补充图片链接
> 创建连接器
> http://image.zhaojianyun.com/mweb/bug1.png
> TableSourceSinkFactory中的创建sink
> http://image.zhaojianyun.com/mweb/bug2.png
> TableSchema的运行时物理信息
> http://image.zhaojianyun.com/mweb/bug3.png
>
>
>
> 2020年8月18日 下午10:09,赵 建云  zhaojianyu...@outlook.com>> 写道:
>
> hello all:
> 我在为flink 1.11开发新的连接器时,发现了问题。
> 连接器的旧版本是支持flink1.9的,最近升级了flink
> 1.11后,test中,发现创建连接器需要声明schema,schema需要使用TableSchema信息,TableSchema包含的DataType,DataType指定物理类型后,在TableSourceSinkFactory中,获得的schema中,丢失了前面指定的物理类型。
> 这个问题影响了source、sink。导致了启动时,检查类型不能通过。
> 例如
> DataTypes.DATE().bridgedTo(java.sql.Date.class);中,在运行时物理类型java.sql.Date丢失了,实际使用的是java.time.LocalDate。
>
>  *
> 创建连接器
> [创建连接器]
>  *
> TableSourceSinkFactory中的创建sink
> [TableSourceSinkFactory中的创建sink]
>  *
> TableSchema的运行时物理信息
> [TableSchema的运行时物理信息]
>
> 我在flink的jira没找到提交问题的按钮,so,就把问题发在了中文组里,请大家支持下这个问题~
> 赵建云
> 2020年8月18日
>
>

-- 
Best, Jingsong Lee


Re: Flink SQL血缘关系

2020-08-20 文章 Jingsong Li
取决于你为啥要做血缘关系

On Wed, Aug 19, 2020 at 1:17 AM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

> 哪位大佬知道,如果要做Flink SQL血缘关系是在sqlNode中拿表之间关系好,还是在Transformation 算子中拿血缘关系好
>
>
>
> guaishushu1...@163.com
>


-- 
Best, Jingsong Lee


Re: Flink StreamingFileSink滚动策略

2020-08-20 文章 Jingsong Li
只要你继承CheckpointRollingPolicy,想怎么实现shouldRollOnEvent和shouldRollOnProcessingTime都行

On Wed, Aug 19, 2020 at 6:20 PM guoliang_wang1335 
wrote:

> 请问,Flink StreamingFileSink使用批量写Hadoop SequenceFile
> format,能自定义滚动策略吗?我想指定文件大小、文件最长未更新时间和checponit来进行滚动,可以通过实现RollingPolicy接口来定制吗?谢谢!
>
>
> 看文档<
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
> >备注,批量编码默认情况下仅仅有OnCheckpointRollingPolicy,在每次checkpoint时候进行切分。如果设置checkpoint时间不合理,这样会产生蛮多小文件的。
>
>
>
>
>
>

-- 
Best, Jingsong Lee


Re: Orc文件问题请教

2020-08-20 文章 Jingsong Li
你可以贴下异常栈,
估计是ORC版本问题,如果你用file system的orc writer,那是比较新的版本。
建议你用下Hive的表来写,这样你可以选版本。

Best,
Jingsong

On Thu, Aug 20, 2020 at 12:10 PM  wrote:

> 使用flink sql写到orc文件,以后,flink能读取出来,但是spark和hive均不能读取出来,impala能读取。
>
> 发自我的iPhone



-- 
Best, Jingsong Lee


Re: flink sql 数据异常导致任务失败

2020-08-18 文章 Jingsong Li
Hi,

最新的版本(1.11+)已经有这个属性可以配置了:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#format-options

Best,
Jingsong

On Tue, Aug 18, 2020 at 2:42 PM 赵一旦  wrote:

> 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
>
> shizk233  于2020年8月18日周二 下午2:26写道:
>
> > 考虑修改一下json解析的逻辑来处理异常数据?
> >
> > 赵一旦  于2020年8月18日周二 上午11:59写道:
> >
> > > 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> > > 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> > > api,然后捕获所有异常即可。
> > >
> > > 赵一旦  于2020年8月17日周一 下午7:15写道:
> > >
> > > > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> > > >
> > > > 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
> > > >
> > > > 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
> > > >
> > >
> >
>


-- 
Best, Jingsong Lee


Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 文章 Jingsong Li
另外问一下,是什么格式?csv还是parquet。
有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?

On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:

>
>
>
>
>
>
> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
> 就是感觉停止之前正在写的那个分区,没有触发commit
>
>
>
>
> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
> >那你之前的分区除了in-progress文件,有已完成的文件吗?
> >
> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >> source就是kafka
> >>
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
> >>
> >>
> >>
> >>
> >>
> >>
> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >>
> >>
> >>
> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >>
> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >> >你的source是exactly-once的source吗?
> >> >
> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >
> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> >@ Jingsong
> >> >>
> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> >> 用presto查询查不了
> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >> >>  'sink.partition-commit.trigger'='process-time',
> >> >>   'sink.partition-commit.delay'='0 min',
> >> >>
>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >>   'sink.rolling-policy.check-interval'='30s',
> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> >> >> partition,就导致有数据,但是确查不 了。
> >> >>
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> >> partition 也能查了。
> >> >> >
> >> >> >
> >> >> >
> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >> >>
> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >> >> >>
> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >> >>>
> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
> wrote:
> >> >> >>>
> >> >> >>> > 1.StreamingFileWriter
> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >> >>> >
> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >>--
> >> >> >>Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 文章 Jingsong Li
那你之前的分区除了in-progress文件,有已完成的文件吗?

On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:

>
>
>
> source就是kafka
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>
>
>
>
>
>
> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
>
>
> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >你的source是exactly-once的source吗?
> >
> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >
> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> >@ Jingsong
> >>
> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> 用presto查询查不了
> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >>  'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval'='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> >> partition,就导致有数据,但是确查不 了。
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> partition 也能查了。
> >> >
> >> >
> >> >
> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >>
> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >> >>
> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >>>
> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
> >> >>>
> >> >>> > 1.StreamingFileWriter
> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >>> >
> >> >>> >
> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >>> >
> >> >>> >
> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >>>
> >> >>
> >> >>
> >> >>--
> >> >>Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 Jingsong Li
你的source是exactly-once的source吗?

in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?

On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> >@ Jingsong
>
> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> 用presto查询查不了
> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>  'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval'='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
>如果是12:39分 05秒左右做一次savepoint,然后
> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> partition,就导致有数据,但是确查不 了。
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> partition 也能查了。
> >
> >
> >
> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >>
> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >>
> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >>>
> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
> >>>
> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >>> > =2100分区的数据还存在很多的in-progress文件。
> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >>> >
> >>> >
> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >>> >
> >>> >
> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >>>
> >>
> >>
> >>--
> >>Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 文章 Jingsong Li
in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响

On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:

> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>
> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>
> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> > =2100分区的数据还存在很多的in-progress文件。
> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >
> >
> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >
> >
> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>


-- 
Best, Jingsong Lee


Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Jingsong Li
Hi,

我觉得是时候考虑把hive文档移到connector里了,我们没必要割裂它们

Best,
Jingsong

On Tue, Aug 11, 2020 at 10:39 AM Zhao,Yi(SEC)  wrote:

> 是的。我更多是纠结文档结构容易造成混淆。我认为catalog和connector是相对独立的概念。最对算是有点关系。
> 但是根据其他人的回答,目前来看,这2者还真没办法完全独立。比如jdbc connector就是不支持hive表。读写hive表还就是需要hive
> catalog。于是我刚刚回了另一封邮件写到,这种case下,我认为实践中,可以单独搞一个hive
> metastore仅仅服务于flink,hive和spark-sql则使用另一个hive
> metastore。这样去独立出来,避免出现流表被spark,hive可见。
> __
>
> 在 2020/8/11 上午10:35,“Rui Li” 写入:
>
>
> 你是想问Flink通过HiveCatalog创建的流式表在SparkSQL中是不是可见么?Flink通过HiveCatalog创建的流式表在HMS中也是作为一张普通的表存在的,所以我理解SparkSQL如果对接同一个HMS的话也是可以看到这张表的。但不管是Hive还是SparkSQL,尝试查询这个流式表应该都会出错,目前这一点是需要用户自己保证的,比如可以通过不同的DB来做划分。
>
> On Mon, Aug 10, 2020 at 8:43 PM Zhao,Yi(SEC) 
> wrote:
>
> > 如果使用了Hive
> >
> catalog,我创建一个流式表,然后返回基于同一个HiveCatalog的spark-sql中,那个表能看到吗?如果尝试查询是不是会出错?
> > 无法实验:我现在还没搞定,因为简单的配置ok,连接到了hive metastore,也通过 show
> >
> tables看到了表,但select会出错(这个问题后续再说,现在就是想知道这种基于已有catalog的情况时是不是不太好,比较flink-sql特有流表)。
> >
> > 在 2020/8/10 下午8:24,“Danny Chan” 写入:
> >
> > 你好 ~
> >
> > 1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog
> 用户可以直接访问
> > catalog 中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
> > 2. 访问 hive metastore 中的表示一定要用 hive catalog
> 的,如果是新建临时表(不持久化),也可以使用内置的
> > catalog
> >
> > Best,
> > Danny Chan
> > 在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> > > 1 为什么flinksql 1.11中,JDBC
> > Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc
> > connectior和jdbc catalog分开放入各自目录。
> > >
> > > 2 为什么flinksql1.11中,connector部分没有hive connector。而是在hive
> > integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive
> Integration/Hive Read
> > & Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive,
> Flink
> > can read and write from Hive data as an alternative to Hive’s batch
> >
> engine.”。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive
> > catalog。不可以使用jdbc catalog,但使用hive connector嘛?
> >
> >
> >
>
> --
> Best regards!
> Rui Li
>
>
>

-- 
Best, Jingsong Lee


Re: flink 1.11 streaming 写入hive 5min表相关问题

2020-07-29 文章 Jingsong Li
Hi,

1.checkpoint会强制滚动
2.目前最简单的思路是加大checkpoint interval,另一个思路是在partition commit时触发hive去compaction。
3.success文件的生成依赖checkpoint interval,所以会有一定延迟。

Best,
Jingsong

On Thu, Jul 30, 2020 at 1:14 PM kandy.wang  wrote:

> 现象:
> CREATE TABLE test.xxx_5min (
>
> ..
>
> ) PARTITIONED BY (dt string , hm string) stored as orc  TBLPROPERTIES(
>
>   'sink.partition-commit.trigger'='process-time',
>
>   'sink.partition-commit.delay'='5 min',
>
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>
>   'sink.rolling-policy.file-size'='128MB',
>
>   'sink.rolling-policy.check-interval' ='30s',
>
>   'sink.rolling-policy.rollover-interval'='5min'
> );
> dt = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd')
> hm = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')
> 5min产生一个分区, ,checkpoint频率:30s
> 问题:
> 1.flink 1.11 
> steaming写入为什么是1min产生一个文件,而且文件大小没有到128M,如果参数sink.rolling-policy.rollover-interval'='5min
> 文件滚动时间 5min 滚动大小128M生效的话,就不应该产生这么小的问题,文件大小没有按照预期控制,为啥?
>  2.小文件问题该如何解决?有什么好的思路
> 3. 标记文件_Success文件为啥上报延迟? 如果是
> 12:30的分区,5min的分区,理论上应该12:35左右的时候就应该提交partition?
>
>
>
>


-- 
Best, Jingsong Lee


Re: flink row 类型

2020-07-23 文章 Jingsong Li
可以看下Flink 1.11的UDF type inference.

在TypeInference中有input的type,这个type应该是包含字段信息的。

Best,
Jingsong

On Thu, Jul 23, 2020 at 2:09 PM Dream-底限  wrote:

> hi
> 是的,我们的数据场景比较尴尬,那我想其他方式实现一下
>
> Benchao Li  于2020年7月23日周四 下午12:55写道:
>
> > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
> > 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。
> >
> > Dream-底限  于2020年7月22日周三 下午7:22写道:
> >
> > > hi、
> > > 我这面定义row数据,类型为ROW,可以通过
> > > row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
> > >
> > > rule_key  转换为rule_key1,rulekey2
> > > 1
> > > 2
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 
Best, Jingsong Lee


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jingsong Li
相同操作我也没有复现。。是可以成功执行的

你的HDFS是什么版本?是否可以考虑换个来测试下

On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang 
wrote:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>

-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 文章 Jingsong Li
Thanks for being the release manager for the 1.11.1 release, Dian.

Best,
Jingsong

On Thu, Jul 23, 2020 at 10:12 AM Zhijiang 
wrote:

> Thanks for being the release manager and the efficient work, Dian!
>
> Best,
> Zhijiang
>
> --
> From:Konstantin Knauf 
> Send Time:2020年7月22日(星期三) 19:55
> To:Till Rohrmann 
> Cc:dev ; Yangze Guo ; Dian Fu <
> dia...@apache.org>; user ; user-zh <
> user-zh@flink.apache.org>
> Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released
>
> Thank you for managing the quick follow up release. I think this was very
> important for Table & SQL users.
>
> On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann 
> wrote:
> Thanks for being the release manager for the 1.11.1 release, Dian. Thanks
> a lot to everyone who contributed to this release.
>
> Cheers,
> Till
>
> On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
> Thanks Dian for the great work and thanks to everyone who makes this
> release possible!
>
> Best, Hequn
>
> On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:
>
> > Congratulations! Thanks Dian for the great work and to be the release
> > manager!
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
> >
> > > Congrats!
> > >
> > > Thanks Dian Fu for being release manager, and everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> > wrote:
> > > >
> > > > Congratulations! Thanks Dian for the great work!
> > > >
> > > > Best,
> > > > Wei
> > > >
> > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > > > >
> > > > > Congratulations!
> > > > >
> > > > > Thanks Dian Fu for the great work as release manager, and thanks
> > > everyone involved!
> > > > >
> > > > > Best
> > > > > Leonard Xu
> > > > >
> > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > > > >>
> > > > >> The Apache Flink community is very happy to announce the release
> of
> > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
> > Flink
> > > 1.11 series.
> > > > >>
> > > > >> Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > streaming
> > > applications.
> > > > >>
> > > > >> The release is available for download at:
> > > > >> https://flink.apache.org/downloads.html
> > > > >>
> > > > >> Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > > > >>
> > > > >> The full release notes are available in Jira:
> > > > >>
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323
> > > > >>
> > > > >> We would like to thank all contributors of the Apache Flink
> > community
> > > who made this release possible!
> > > > >>
> > > > >> Regards,
> > > > >> Dian
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>
>
>

-- 
Best, Jingsong Lee


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Jingsong Li
Hi 首维,

非常感谢你的信息,我们试图 去掉太灵活的“DataStream”,但是也逐渐发现一些额外的需求,再考虑和评估下你的需求。

CC: @Jark Wu 

Best,
Jingsong

On Wed, Jul 22, 2020 at 1:49 PM 刘首维  wrote:

> Hi JingSong,
>
>
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
>   下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
>
>
>   1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>   2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>   3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>   4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
>
>
> 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
>
>
> 
> 发件人: Jingsong Li 
> 发送时间: 2020年7月22日 13:26:00
> 收件人: user-zh
> 抄送: imj...@gmail.com
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
>
> Best
> Jingsong
>
> On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:
>
> > Hi all,
> >
> >
> >
> > 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >
> > 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >
> >
> >
> > 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 Jingsong Li
可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?

Best
Jingsong

On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:

> Hi all,
>
>
>
> 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>
> 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>
>
>
> 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>


-- 
Best, Jingsong Lee


Re: Flink catalog的几个疑问

2020-07-21 文章 Jingsong Li
Hi,

HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。

> 后续有可能转正为flink 默认的catalog实现吗?

目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。

> hive catalog是不支持大小写敏感的

是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。

Best,
Jingsong

On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:

> hi Xingxing,
>
> 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> postgres catalog,
> 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> catalog写新的meta。
> 是否会转为默认catalog,据我所知,目前没有。
> 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
>
> Best,
> Godfrey
>
> dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
>
> > Hi Flink社区:
> > 有几个疑问希望社区小伙伴们帮忙解答一下:
> >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> >
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>


-- 
Best, Jingsong Lee


Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Jingsong Li
你的Source表是怎么定义的?确定有watermark前进吗?(可以看Flink UI)

'sink.partition-commit.trigger'='partition-time' 去掉试试?

Best,
Jingsong

On Wed, Jul 22, 2020 at 12:02 AM Leonard Xu  wrote:

> HI,
>
> Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下
>
> Best
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
> >
>
> > 在 2020年7月21日,22:57,kcz <573693...@qq.com> 写道:
> >
> > 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: JasonLee <17610775...@163.com 
> > 发送时间: 2020年7月21日 20:39
> > 收件人: user-zh mailto:user-zh@flink.apache.org
> >
> > 主题: 回复:flink-1.11 ddl kafka-to-hive问题
> >
> >
> >
> > hi
> > hive表是一直没有数据还是过一段时间就有数据了?
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月21日 19:09,kcz 写道:
> > hive-1.2.1
> > chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
> > String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" +
> >  " host STRING,\n" +
> >  " url STRING," +
> >  " public_date STRING" +
> >  ") partitioned by (public_date
> string) " +
> >  "stored as PARQUET " +
> >  "TBLPROPERTIES (\n" +
> >  "
> 'sink.partition-commit.delay'='0 s',\n" +
> >  "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> >  "
> 'sink.partition-commit.policy.kind'='metastore,success-file'" +
> >  ")";
> > tableEnv.executeSql(hiveSql);
> >
> >
> > tableEnv.executeSql("INSERT INTO stream_tmp.fs_table SELECT host,
> url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");
>
>

-- 
Best, Jingsong Lee


Re: Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

2020-07-20 文章 Jingsong Li
默认创建的是Flink表,Hive端不可见。
你想创建Hive表的话,用Hive dialect。

Best,
Jingsong

On Tue, Jul 21, 2020 at 11:31 AM felixzh  wrote:

> 参照文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
> 通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
> 在beeline端可见表,但是desc看不到字段,select * from orders也不可用
>
>

-- 
Best, Jingsong Lee


Re: flink1.11 run

2020-07-20 文章 Jingsong Li
是的。

但是不管怎么滚动,最终都是checkpoint完成后文件才可见

On Mon, Jul 20, 2020 at 7:10 PM Dream-底限  wrote:

> hi、
> 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
>
> 》可以,默认下 128MB 滚动,Checkpoint 滚动
>
> Jingsong Li  于2020年7月20日周一 下午6:12写道:
>
> > Hi Dream,
> >
> > > 1.一定要在flink内部先建立hive表吗?
> >
> > 不用,哪边建无所谓
> >
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > 可以,默认下 128MB 滚动,Checkpoint 滚动。
> >
> > Best,
> > Jingsong
> >
> > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
> >
> > >  hi
> > > 好的,想问一下stream写hive表的时候:
> > > 1、一定要在flink内部先建立hive表吗?
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> > >
> > > Rui Li  于2020年7月20日周一 下午4:44写道:
> > >
> > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > > >
> > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 
> wrote:
> > > >
> > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > > >
> > > > > 异常:
> > > > > The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > method
> > > > > caused an error: No operators defined in streaming topology. Cannot
> > > > > generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > > at
> > > >
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > > streaming topology. Cannot generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > &

Re: flink1.11 run

2020-07-20 文章 Jingsong Li
Hi Dream,

> 1.一定要在flink内部先建立hive表吗?

不用,哪边建无所谓

> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

可以,默认下 128MB 滚动,Checkpoint 滚动。

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li  于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > environment.setStateBackend(new MemoryStateBackend());
> > > environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > > String name = "myhive";
> > > String defaultDatabase = "tmp";
> > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > String version = "1.1.0";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > tableEnv.registerCatalog("myhive", hive);
> > > tableEnv.useCatalog("myhive");
> > >
> > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > "  user_id BIGINT,\n" +
> > > "  item_id STRING,\n" +
> > > "  behavior STRING,\n" +
> > > "  ts AS PROCTIME()\n" +
> > > ") WITH (\n" +
> > > " 'connector' = 'kafka-0.11',\n" +
> > > " 'topic' = 'user_behavior',\n" +
> > > " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > > " 'properties.group.id' = 'testGroup',\n" +
> > > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > " 'format' = 'json',\n" +
> > > " 'json.fail-on-missing-field' = 'false',\n" +
> > > " 'json.ignore-parse-errors' = 'true'\n" +
> > > ")");
> > >
> > > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //" 

Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 文章 Jingsong Li
Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限  wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸  于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775...@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >public static void main(String[] arg) throws Exception{
> > >StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >String name= "myhive";
> > >String defaultDatabase = "default";
> > >String hiveConfDir =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >String version = "3.1.2";
> > >
> > >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >bsTableEnv.registerCatalog("myhive", hive);
> > >bsTableEnv.useCatalog("myhive");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)," +
> > >"  create_time TIMESTAMP " +
> > >") WITH (" +
> > >" 'connector' = 'kafka'," +
> > >" 'topic' = 'order.test'," +
> > >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >" 'properties.group.id' = 'testGroup'," +
> > >" 'scan.startup.mode' = 'earliest-offset', " +
> > >" 'format' = 'json'  " +
> > >")");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)" +
> > >"  )");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >"id, " +
> > >"order_id, " +
> > >"amount " +
> > >"FROM topic_products");
> > >
> > >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >table1.executeInsert("print_table");
> > >}
> > > }
> > >
> >
>


-- 
Best, Jingsong Lee


Re: connector hive依赖冲突

2020-07-17 文章 Jingsong Li
用bundle jar可以搞定吗?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar

Best,
Jingsong

On Fri, Jul 17, 2020 at 5:14 PM Dream-底限  wrote:

> hi:
>
> 大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了
>
> // Flink's Hive connector.Contains flink-hadoop-compatibility and
> flink-orc jars
>flink-connector-hive_2.11-1.11.0.jar
>// Hive dependencies
>hive-exec-2.3.4.jar
>


-- 
Best, Jingsong Lee


Re: flink1.11写hive

2020-07-17 文章 Jingsong Li
CREATE TEMPORARY TABLE kafka_table...
好像没文档,我建个JIRA跟踪下
https://issues.apache.org/jira/browse/FLINK-18624

Best,
Jingsong

On Fri, Jul 17, 2020 at 5:05 PM Dream-底限  wrote:

> hi:
>
> 我这面在flink中注册hivecatalog,想将kafka数据流式写入到hive表中,但是现在建立kafka表的时候默认会保存元数据到hive表,请问有办法不保存这个kafka元数据表吗?如果不注册hivecatalog的话没办法写数据到hive吧
>


-- 
Best, Jingsong Lee


Re: flink-1.11使用executeSql()执行DDL语句问题

2020-07-14 文章 Jingsong Li
还要添加flink-json

Best,
Jingsong

On Tue, Jul 14, 2020 at 2:38 PM amen...@163.com  wrote:

> hi, everyone
>
> 环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)
>
> 问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)
>
> 我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。
>
>
> --分割线-
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.kafka_out'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='flink-1.11'
> 'scan.startup.mode'='group-offsets'
> 'topic'='flink-kafka'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at example.Example.main(Example.java:77)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factories that implement
> 'org.apache.flink.table.factories.DeserializationFormatFactory' in the
> classpath.
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 25 more
>
>
> --分割线-
>
> 祝好!
> amenhub
>


-- 
Best, Jingsong Lee


Re: (无主题)

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Tue, Jul 14, 2020 at 12:36 PM 成欢晴  wrote:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master



-- 
Best, Jingsong Lee


  1   2   3   >