Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > u...@flink.apache.org ;"user-zh"< user-zh@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > user@flink.apache.org ;"user-zh"< user...@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
Amazing, congrats!

Best,
Jingsong

On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
>
> 退订
>
>
>
>
>
>
>
> Original Email
>
>
>
> Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
>
> Sent Time:2024/5/17 23:10
>
> To:"Qingsheng Ren"< re...@apache.org ;
>
> Cc recipient:"dev"< d...@flink.apache.org ;"user"< u...@flink.apache.org 
> ;"user-zh"< user-zh@flink.apache.org ;"Apache Announce List"< 
> annou...@apache.org ;
>
> Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
>
>
> Congratulations !
> Thanks for all contributors.
>
>
> Best,
>
> Zhongqiang Gong
>
> Qingsheng Ren  于 2024年5月17日周五 17:33写道:
>
>  The Apache Flink community is very happy to announce the release of
>  Apache Flink CDC 3.1.0.
> 
>  Apache Flink CDC is a distributed data integration tool for real time
>  data and batch data, bringing the simplicity and elegance of data
>  integration via YAML to describe the data movement and transformation
>  in a data pipeline.
> 
>  Please check out the release blog post for an overview of the release:
> 
>  
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
>  The release is available for download at:
>  https://flink.apache.org/downloads.html
> 
>  Maven artifacts for Flink CDC can be found at:
>  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
>  The full release notes are available in Jira:
> 
>  
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> 
>  We would like to thank all contributors of the Apache Flink community
>  who made this release possible!
> 
>  Regards,
>  Qingsheng Ren
> 


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Jingsong Li
Congratulations!

On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>>
>> Best,
>> Yun, Jing, Martijn and Lincoln


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Jingsong Li
Congratulations!

On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>>
>> Best,
>> Yun, Jing, Martijn and Lincoln


Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-30 Thread Jingsong Li
+1, the fallback looks weird now, it is outdated.

But, it is good to provide an option. I don't know if there are some
users who depend on this fallback.

Best,
Jingsong

On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
>
> +1, the fallback was just intended as a temporary workaround to run 
> catalog/module related statements with hive dialect.
>
> On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
>>
>> Big +1 on this, thanks yuxia for driving this!
>>
>> yuxia  于2023年5月29日周一 14:55写道:
>>
>> > Hi, community.
>> >
>> > I want to start the discussion about Hive dialect shouldn't fall back to
>> > Flink's default dialect.
>> >
>> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
>> > it'll fall back to Flink's default parser[1] to handle flink-specific
>> > statements like "CREATE CATALOG xx with (xx);".
>> >
>> > As I‘m involving with Hive dialect and have some communication with
>> > community users who use Hive dialectrecently,  I'm thinking throw exception
>> > directly instead of falling back to Flink's default dialect when fail to
>> > parse the sql in Hive dialect
>> >
>> > Here're some reasons:
>> >
>> > First of all, it'll hide some error with Hive dialect. For example, we
>> > found we can't use Hive dialect any more with Flink sql client in release
>> > validation phase[2], finally we find a modification in Flink sql client
>> > cause it, but our test case can't find it earlier for although HiveParser
>> > faill to parse it but then it'll fall back to default parser and pass test
>> > case successfully.
>> >
>> > Second, conceptually, Hive dialect should be do nothing with Flink's
>> > default dialect. They are two totally different dialect. If we do need a
>> > dialect mixing Hive dialect and default dialect , may be we need to propose
>> > a new hybrid dialect and announce the hybrid behavior to users.
>> > Also, It made some users confused for the fallback behavior. The fact
>> > comes from I had been ask by community users. Throw an excpetioin directly
>> > when fail to parse the sql statement in Hive dialect will be more 
>> > intuitive.
>> >
>> > Last but not least, it's import to decouple Hive with Flink planner[3]
>> > before we can externalize Hive connector[4]. If we still fall back to Flink
>> > default dialct, then we will need depend on `ParserImpl` in Flink planner,
>> > which will block us removing the provided dependency of Hive dialect as
>> > well as externalizing Hive connector.
>> >
>> > Although we hadn't announced the fall back behavior ever, but some users
>> > may implicitly depend on this behavior in theirs sql jobs. So, I hereby
>> > open the dicussion about abandoning the fall back behavior to make Hive
>> > dialect clear and isoloted.
>> > Please remember it won't break the Hive synatax but the syntax specified
>> > to Flink may fail after then. But for the failed sql, you can use `SET
>> > table.sql-dialect=default;` to switch to Flink dialect.
>> > If there's some flink-specific statements we found should be included in
>> > Hive dialect to be easy to use, I think we can still add them as specific
>> > cases to Hive dialect.
>> >
>> > Look forwards to your feedback. I'd love to listen the feedback from
>> > community to take the next steps.
>> >
>> > [1]:
>> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
>> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
>> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
>> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
>> >
>> >
>> >
>> > Best regards,
>> > Yuxia
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards!
> Rui Li


[ANNOUNCE] Apache Flink Table Store 0.3.0 released

2023-01-13 Thread Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.3.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/13/release-table-store-0.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://central.sonatype.dev/search?q=flink-table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352111

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Best,
Jingsong Lee


[ANNOUNCE] Apache Flink Table Store 0.3.0 released

2023-01-13 Thread Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.3.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/13/release-table-store-0.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://central.sonatype.dev/search?q=flink-table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352111

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Best,
Jingsong Lee


Re: Re: table store 和connector-kafka包冲突吗?

2022-10-11 Thread Jingsong Li
附:修复连接:https://github.com/apache/flink-table-store/commit/c1b28985ce8bc8fb80fac96380edf3b34e4126b8

Best,
Jingsong

On Tue, Oct 11, 2022 at 3:27 PM Jingsong Li  wrote:
>
> Hi RS,
>
> 这是bug,已经修复了
>
> 建议使用即将发布的:0.2.1 
> https://lists.apache.org/thread/n1yzpbxprnsh2m8swpsr40glt8h2b93v
>
> 具体的 jar 
> 包在这里:https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.1-rc2/
>
> Best,
> Jingsong
>
> On Tue, Oct 11, 2022 at 3:17 PM 李宇彬  wrote:
> >
> > 方便贴下sql吗,我在flink 1.15 + fts 0.3.0-SNAPSHOT没复现出这个问题
> >
> >
> > 在2022年10月11日 09:19,RS 写道:
> > Hi,
> >
> >
> > 去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。
> >
> >
> > Thanks
> >
> > 在 2022-10-10 12:50:33,"yanfei lei"  写道:
> > Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
> > https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
> > 把flink-connector-kafka-1.15.1.jar 去掉再试试?
> >
> >
> > RS  于2022年10月8日周六 17:19写道:
> >
> > Hi,
> > 报错如下:
> >
> >
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.table.api.ValidationException: Multiple factories for
> > identifier 'kafka' that implement
> > 'org.apache.flink.table.factories.DynamicTableFactory' found in the
> > classpath.
> >
> >
> > Ambiguous factory classes are:
> >
> >
> > org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> > org.apache.flink.table.store.kafka.KafkaLogStoreFactory
> >
> > org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> >
> >
> > Thanks
> >
> >
> >
> >
> >
> > 在 2022-10-08 13:38:20,"Shammon FY"  写道:
> > Hi RS
> > 你这边能提供一下具体的冲突错误栈吗?
> >
> > On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
> >
> > Hi,
> >
> >
> > 版本:flink-1.15.1
> > 使用table
> >
> > store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
> >
> >
> > 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
> >
> >
> > Thanks
> >


Re: Re: table store 和connector-kafka包冲突吗?

2022-10-11 Thread Jingsong Li
Hi RS,

这是bug,已经修复了

建议使用即将发布的:0.2.1 https://lists.apache.org/thread/n1yzpbxprnsh2m8swpsr40glt8h2b93v

具体的 jar 
包在这里:https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.1-rc2/

Best,
Jingsong

On Tue, Oct 11, 2022 at 3:17 PM 李宇彬  wrote:
>
> 方便贴下sql吗,我在flink 1.15 + fts 0.3.0-SNAPSHOT没复现出这个问题
>
>
> 在2022年10月11日 09:19,RS 写道:
> Hi,
>
>
> 去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。
>
>
> Thanks
>
> 在 2022-10-10 12:50:33,"yanfei lei"  写道:
> Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
> https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
> 把flink-connector-kafka-1.15.1.jar 去掉再试试?
>
>
> RS  于2022年10月8日周六 17:19写道:
>
> Hi,
> 报错如下:
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Multiple factories for
> identifier 'kafka' that implement
> 'org.apache.flink.table.factories.DynamicTableFactory' found in the
> classpath.
>
>
> Ambiguous factory classes are:
>
>
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> org.apache.flink.table.store.kafka.KafkaLogStoreFactory
>
> org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>
>
> Thanks
>
>
>
>
>
> 在 2022-10-08 13:38:20,"Shammon FY"  写道:
> Hi RS
> 你这边能提供一下具体的冲突错误栈吗?
>
> On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
>
> Hi,
>
>
> 版本:flink-1.15.1
> 使用table
>
> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>
>
> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>
>
> Thanks
>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-09 Thread Jingsong Li
Thanks for driving, Qingsheng.

+1 for reverting sink metric name.

We often forget that metric is also one of the important APIs.

+1 for releasing 1.15.3 to fix this.

Best,
Jingsong

On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  wrote:
>
> Thanks for raising the discussion, Qingsheng,
>
> +1 on reverting the breaking changes.
>
> In addition, we might want to release a 1.15.3 to fix this and update the 
> previous release docs with this known issue, so that users can upgrade to 
> 1.15.3 when they hit it. It would also be good to add some backwards 
> compatibility tests on metrics to avoid unintended breaking changes like this 
> in the future.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  wrote:
>>
>> Hi devs and users,
>>
>> I’d like to start a discussion about reverting a breaking change about sink 
>> metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
>>
>> TL;DR
>>
>> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace by 
>> “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names are 
>> public APIs, this is a breaking change to end users and not backward 
>> compatible. Also unfortunately this breaking change was not discussed in the 
>> mailing list before.
>>
>> Background
>>
>> As defined previously in FLIP-33 (the FLIP page has been changed so please 
>> refer to the old version [3] ), metric “numRecordsOut” is used for reporting 
>> the total number of output records since the sink started (number of records 
>> written to the external system), and similarly for “numRecordsOutPerSecond”, 
>> “numBytesOut”, “numBytesOutPerSecond” and “numRecordsOutError”. Most sinks 
>> are following this naming and definition. However, these metrics are 
>> ambiguous in the new Sink API as “numXXXOut” could be used by the output of 
>> SinkWriterOperator for reporting number of Committables delivered to 
>> SinkCommitterOperator. In order to resolve the conflict, FLINK-26126 and 
>> FLINK-26492 changed names of these metrics with “numXXXSend”.
>>
>> Necessity of reverting this change
>>
>> - Metric names are actually public API, as end users need to configure 
>> metric collecting and alerting system with metric names. Users have to reset 
>> all configurations related to affected metrics.
>> - This could also affect custom and external sinks not maintained by Flink, 
>> which might have implemented with numXXXOut metrics.
>> - The number of records sent to external system is way more important than 
>> the number of Committables sent to SinkCommitterOperator, as the latter one 
>> is just an internal implementation of sink. We could have a new metric name 
>> for the latter one instead.
>> - We could avoid splitting the project by version (like “plz use numXXXOut 
>> before 1.15 and use numXXXSend after”) if we revert it ASAP, cosidering 1.16 
>> is still not released for now.
>>
>> As a consequence, I’d like to hear from devs and users about your opinion on 
>> changing these metrics back to “numXXXOut”.
>>
>> Looking forward to your reply!
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26126
>> [2] https://issues.apache.org/jira/browse/FLINK-26492
>> [1] FLIP-33, version 18: 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136
>>
>> Best,
>> Qingsheng


Re: [ANNOUNCE] Apache Flink 1.14.6 released

2022-09-28 Thread Jingsong Li
Thanks Xingbo for releasing it.

Best,
Jingsong

On Wed, Sep 28, 2022 at 10:52 AM Xingbo Huang  wrote:
>
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.14.6, which is the fifth bugfix release for the Apache Flink 1.14 
> series.
>
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2022/09/28/release-1.14.6.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351834
>
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>
> Regards,
> Xingbo


Re: [ANNOUNCE] Apache Flink 1.14.6 released

2022-09-28 Thread Jingsong Li
Thanks Xingbo for releasing it.

Best,
Jingsong

On Wed, Sep 28, 2022 at 10:52 AM Xingbo Huang  wrote:
>
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.14.6, which is the fifth bugfix release for the Apache Flink 1.14 
> series.
>
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2022/09/28/release-1.14.6.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351834
>
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>
> Regards,
> Xingbo


[ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 Thread Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.2.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Jingsong Lee


[ANNOUNCE] Apache Flink Table Store 0.2.0 released

2022-08-28 Thread Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.2.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351570

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Jingsong Lee


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jingsong Li
Thanks Xingtong, Jark, Martijn and Robert for making this possible!

Best,
Jingsong


On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:

> Thank Xingtong for making this possible!
>
> Cheers,
> Jark Wu
>
> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm very happy to announce that the Apache Flink community has created a
> > dedicated Slack workspace [1]. Welcome to join us on Slack.
> >
> > ## Join the Slack workspace
> >
> > You can join the Slack workspace by either of the following two ways:
> > 1. Click the invitation link posted on the project website [2].
> > 2. Ask anyone who already joined the Slack workspace to invite you.
> >
> > We recommend 2), if available. Due to Slack limitations, the invitation
> > link in 1) expires and needs manual updates after every 100 invites. If
> it
> > is expired, please reach out to the dev / user mailing lists.
> >
> > ## Community rules
> >
> > When using the community Slack workspace, please follow these community
> > rules:
> > * *Be respectful* - This is the most important rule!
> > * All important decisions and conclusions *must be reflected back to the
> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
> happen."
> > - The Apache Mottos [3]
> > * Use *Slack threads* to keep parallel conversations from overwhelming a
> > channel.
> > * Please *do not direct message* people for troubleshooting, Jira
> assigning
> > and PR review. These should be picked-up voluntarily.
> >
> >
> > ## Maintenance
> >
> >
> > Committers can refer to this wiki page [4] for information needed for
> > maintaining the Slack workspace.
> >
> >
> > Thanks Jark, Martijn and Robert for helping setting up the Slack
> workspace.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://apache-flink.slack.com/
> >
> > [2] https://flink.apache.org/community.html#slack
> >
> > [3] http://theapacheway.com/on-list/
> >
> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
> >
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jingsong Li
Thanks Xingtong, Jark, Martijn and Robert for making this possible!

Best,
Jingsong


On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:

> Thank Xingtong for making this possible!
>
> Cheers,
> Jark Wu
>
> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm very happy to announce that the Apache Flink community has created a
> > dedicated Slack workspace [1]. Welcome to join us on Slack.
> >
> > ## Join the Slack workspace
> >
> > You can join the Slack workspace by either of the following two ways:
> > 1. Click the invitation link posted on the project website [2].
> > 2. Ask anyone who already joined the Slack workspace to invite you.
> >
> > We recommend 2), if available. Due to Slack limitations, the invitation
> > link in 1) expires and needs manual updates after every 100 invites. If
> it
> > is expired, please reach out to the dev / user mailing lists.
> >
> > ## Community rules
> >
> > When using the community Slack workspace, please follow these community
> > rules:
> > * *Be respectful* - This is the most important rule!
> > * All important decisions and conclusions *must be reflected back to the
> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
> happen."
> > - The Apache Mottos [3]
> > * Use *Slack threads* to keep parallel conversations from overwhelming a
> > channel.
> > * Please *do not direct message* people for troubleshooting, Jira
> assigning
> > and PR review. These should be picked-up voluntarily.
> >
> >
> > ## Maintenance
> >
> >
> > Committers can refer to this wiki page [4] for information needed for
> > maintaining the Slack workspace.
> >
> >
> > Thanks Jark, Martijn and Robert for helping setting up the Slack
> workspace.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://apache-flink.slack.com/
> >
> > [2] https://flink.apache.org/community.html#slack
> >
> > [3] http://theapacheway.com/on-list/
> >
> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
> >
>


Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-26 Thread Jingsong Li
比如对于 1.13.6

把flink下面的/lib/flink-table_2.11-1.13.6.jar给删掉

Best,
Jingsong

On Thu, May 26, 2022 at 2:54 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 好的,知道了,以后用这个中文邮件!
> flink-table-legacy 这个模块应该没用,但是我也没找到他在那,是一个独立的模块吗?
>
> --
> hdxg1101300...@163.com
>
>
> *发件人:* Jingsong Li 
> *发送时间:* 2022-05-26 14:47
> *收件人:* hdxg1101300123 
> *抄送:* dev 
> *主题:* Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误
> Please don't use Chinese on the dev mailing list to discuss issues, I've
> replied on user-zh@flink.apache.org.
>
> Best,
> Jingsong
>
> On Thu, May 26, 2022 at 2:43 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
>>
>>
>> --
>> hdxg1101300...@163.com
>>
>>
>> *发件人:* Jingsong Li 
>> *发送时间:* 2022-05-26 14:20
>> *收件人:* dev 
>> *主题:* Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误
>> 如果flink-table-legacy包没用到,把它删掉即可修复你的问题
>>
>> Best,
>> Jingsong
>>
>> On Thu, May 26, 2022 at 2:16 PM hdxg1101300...@163.com <
>> hdxg1101300...@163.com> wrote:
>>
>> > 你好:
>> > 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误:
>> >
>> >
>> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误;
>> >sql如下:
>> >create view if not exists dwm_ai_robot_contact_view as select
>> >
>> CALLER,CALLEE,CUST_LEVEL,PRO_ID,REASON_CODE,INTENT_NAME,GEN_CENTER,BUS_PRO_ID
>> > from realtime_robot_contact table_a left join dc_dim.dim_province_code
>> /*+
>> > OPTIONS('lookup.join.cache.ttl'='12 h') */ for SYSTEM_TIME as of
>> > table_a.pro_time as dim on table_a.PRO_ID = dim.code;
>> > 错误信息:
>> >
>> >
>> >  The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
>> SPECIAL
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> > at
>> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> > at
>> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> > at
>> >
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> > at
>> >
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> > at
>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> > Caused by: java.lang.UnsupportedOperationException: class
>> > org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> > at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>> > at
>> org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
>> > at
>> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
>> > at
>> > org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> > at
>> >
>> org.apache.calcite.sql.SqlSnapshot$SqlSnapshotOperator.unparse(SqlSnapshot.java:128)
>> > at
>> org.apache.calcite.sql.SqlSnapshot.unparse(SqlSnapshot.java:78)
>> > at
>> > org.apache.calcite.sql.SqlAsOperator.unparse(SqlAsOperator.java:76)
>> > at
>> > org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> > at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> > at
>> > org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:229)
>>

Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-26 Thread Jingsong Li
Please send to  user-zh@flink.apache.org

Best,
Jingsong

On Thu, May 26, 2022 at 2:20 PM Jingsong Li  wrote:

> 如果flink-table-legacy包没用到,把它删掉即可修复你的问题
>
> Best,
> Jingsong
>
> On Thu, May 26, 2022 at 2:16 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
>> 你好:
>> 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误:
>>
>>  
>> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误;
>>sql如下:
>>create view if not exists dwm_ai_robot_contact_view as select
>> CALLER,CALLEE,CUST_LEVEL,PRO_ID,REASON_CODE,INTENT_NAME,GEN_CENTER,BUS_PRO_ID
>> from realtime_robot_contact table_a left join dc_dim.dim_province_code /*+
>> OPTIONS('lookup.join.cache.ttl'='12 h') */ for SYSTEM_TIME as of
>> table_a.pro_time as dim on table_a.PRO_ID = dim.code;
>> 错误信息:
>>
>>
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> Caused by: java.lang.UnsupportedOperationException: class
>> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
>> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
>> at
>> org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlSnapshot$SqlSnapshotOperator.unparse(SqlSnapshot.java:128)
>> at org.apache.calcite.sql.SqlSnapshot.unparse(SqlSnapshot.java:78)
>> at
>> org.apache.calcite.sql.SqlAsOperator.unparse(SqlAsOperator.java:76)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:229)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
>> at
>> org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176)
>> at
>> org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
>> at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:154)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:176)
>> at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:185)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:984)
>> at
>> org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:183)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:854)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:823)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>> 

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-18 Thread Jingsong Li
Thanks~ 非常好~

Best,
Jingsong

On Mon, May 16, 2022 at 5:24 PM 18579099...@163.com <18579099...@163.com>
wrote:

> 第一次弄,不知道这么写的对不对
>
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-27604
>
>
>
> 18579099...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2022-05-13 15:06
> 收件人: user-zh
> 主题: Re: Re: flink sql无法读取Hive映射的HBase表
> Hi, 推荐 https://www.deepl.com/translator
> 非常好用
>
> 我记得对Hive Custom Storage Handler(hbase)是有问题的
>
> Best,
> Jingsong
>
> On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@163.com>
> wrote:
>
> > 我英文能力不允许啊
> >
> >
> >
> > 18579099...@163.com
> >
> > 发件人: yuxia
> > 发送时间: 2022-05-11 15:11
> > 收件人: user-zh
> > 主题: Re: flink sql无法读取Hive映射的HBase表
> > 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY
> >   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
> > 我之后空了再debug 看看。
> >
> > 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的
> > StorageDescriptor 的 inputformat 为 null,然后 Class.forName(inputformat) 就报错
> > NPE了。
> > 应该是这块代码有点问题。
> > 如果你方便的话,可以辛苦帮忙建一个 jira~
> > https://issues.apache.org/jira/projects/FLINK/summary
> >
> >
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: 18579099...@163.com
> > 收件人: "user-zh" 
> > 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
> > 主题: Re: Re: flink sql无法读取Hive映射的HBase表
> >
> > 版本:
> > flink:1.13.6
> > hive:2.1.1-cdh6.2.0
> > hbase:2.1.0-cdh6.2.0
> > flinksql执行工具:flink sql client
> > sql 提交模式:yarn-per-job
> >
> >
> -
> > flink lib目录下的包
> > antlr-runtime-3.5.2.jar
> > flink-csv-1.13.6.jar
> > flink-dist_2.11-1.13.6.jar
> > flink-json-1.13.6.jar
> > flink-shaded-zookeeper-3.4.14.jar
> > flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> > flink-table_2.11-1.13.6.jar
> > flink-table-blink_2.11-1.13.6.jar
> > guava-14.0.1.jar
> > hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> > hbase-client-2.1.0-cdh6.2.0.jar
> > hbase-common-2.1.0-cdh6.2.0.jar
> > hbase-protocol-2.1.0-cdh6.2.0.jar
> > hbase-server-2.1.0-cdh6.2.0.jar
> > hive-exec-2.1.1-cdh6.2.0.jar
> > hive-hbase-handler-2.1.1-cdh6.2.0.jar
> > htrace-core4-4.1.0-incubating.jar
> > log4j-1.2-api-2.17.1.jar
> > log4j-api-2.17.1.jar
> > log4j-core-2.17.1.jar
> > log4j-slf4j-impl-2.17.1.jar
> > protobuf-java-2.5.0.jar
> >
> >
> 
> > hive建表语句
> > CREATE EXTERNAL TABLE `ods.student`(
> >   `row_key` string,
> >   `name` string,
> >   `age` int,
> >   `addr` string
> > )
> > ROW FORMAT SERDE
> >   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> > STORED BY
> >   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> > WITH SERDEPROPERTIES (
> >
> >
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> > TBLPROPERTIES (
> >   'hbase.table.name'='ODS:STUDENT') ;
> > catalog:hive catalog
> > sql: select * from ods.student;
> > 我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate
> > the hadoop input format
> >
> >
> --
> > 详细的堆栈
> > org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> > execute SQL statement.
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
> > ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> > at
> >
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:3

Re: 退订

2022-05-15 Thread Jingsong Li
退订请回复到 user-zh-unsubscr...@flink.apache.org

Best,
Jingsong

On Sun, May 15, 2022 at 1:04 PM cq <17691150...@163.com> wrote:

> 退订
>
>
>
> Best Regards,
>
> Jacob.Q.Cao
>
>
> TEL:17691150986


Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-13 Thread Jingsong Li
Hi, 推荐 https://www.deepl.com/translator
非常好用

我记得对Hive Custom Storage Handler(hbase)是有问题的

Best,
Jingsong

On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@163.com>
wrote:

> 我英文能力不允许啊
>
>
>
> 18579099...@163.com
>
> 发件人: yuxia
> 发送时间: 2022-05-11 15:11
> 收件人: user-zh
> 主题: Re: flink sql无法读取Hive映射的HBase表
> 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
> 我之后空了再debug 看看。
>
> 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的
> StorageDescriptor 的 inputformat 为 null,然后 Class.forName(inputformat) 就报错
> NPE了。
> 应该是这块代码有点问题。
> 如果你方便的话,可以辛苦帮忙建一个 jira~
> https://issues.apache.org/jira/projects/FLINK/summary
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: 18579099...@163.com
> 收件人: "user-zh" 
> 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
> 主题: Re: Re: flink sql无法读取Hive映射的HBase表
>
> 版本:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql执行工具:flink sql client
> sql 提交模式:yarn-per-job
>
> -
> flink lib目录下的包
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
>
> 
> hive建表语句
> CREATE EXTERNAL TABLE `ods.student`(
>   `row_key` string,
>   `name` string,
>   `age` int,
>   `addr` string
> )
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES (
>
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT') ;
> catalog:hive catalog
> sql: select * from ods.student;
> 我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop input format
>
> --
> 详细的堆栈
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to
> instantiate the hadoop input format
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
> ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
> at
> 

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-11 Thread Jingsong Li
>>>>>>> On Mon, May 9, 2022 at 3:38 PM Martijn Visser <
>>>>>>> martijnvis...@apache.org> wrote:
>>>>>>>
>>>>>>>> As far as I recall you can't sign up for the ASF instance of Slack,
>>>>>>>> you can
>>>>>>>> only get there if you're a committer or if you're invited by a
>>>>>>>> committer.
>>>>>>>>
>>>>>>>> On Mon, 9 May 2022 at 15:15, Robert Metzger 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Sorry for joining this discussion late, and thanks for the
>>>>>>>> summary Xintong!
>>>>>>>> >
>>>>>>>> > Why are we considering a separate slack instance instead of using
>>>>>>>> the ASF
>>>>>>>> > Slack instance?
>>>>>>>> > The ASF instance is paid, so all messages are retained forever,
>>>>>>>> and quite
>>>>>>>> > a few people are already on that Slack instance.
>>>>>>>> > There is already a #flink channel on that Slack instance, that we
>>>>>>>> could
>>>>>>>> > leave as passive as it is right now, or put some more effort into
>>>>>>>> it, on a
>>>>>>>> > voluntary basis.
>>>>>>>> > We could add another #flink-dev channel to that Slack for
>>>>>>>> developer
>>>>>>>> > discussions, and a private flink-committer and flink-pmc chat.
>>>>>>>> >
>>>>>>>> > If we are going that path, we should rework the "Community" and
>>>>>>>> "Getting
>>>>>>>> > Help" pages and explain that the mailing lists are the "ground
>>>>>>>> truth tools"
>>>>>>>> > in Flink, and Slack is only there to facilitate faster
>>>>>>>> communication, but
>>>>>>>> > it is optional / voluntary (e.g. a committers won't respond to
>>>>>>>> DMs)
>>>>>>>> >
>>>>>>>> > All public #flink-* channels should be archived and
>>>>>>>> google-indexable.
>>>>>>>> > I've asked Jarek from Airflow who's maintaining
>>>>>>>> > http://apache-airflow.slack-archives.org.
>>>>>>>> > If we can't use slack-archives.org, it would be nice to find some
>>>>>>>> > volunteers in the Flink community to hack a simple indexing tool.
>>>>>>>> > The indexing part is very important for me, because of some bad
>>>>>>>> > experiences with the Kubernetes experience, where most of the
>>>>>>>> advanced
>>>>>>>> > stuff is hidden in their Slack, and it took me a few weeks to
>>>>>>>> find that
>>>>>>>> > goldmine of information.
>>>>>>>> >
>>>>>>>> > Overall, I see this as an experiment worth doing, but I would
>>>>>>>> suggest
>>>>>>>> > revisiting it in 6 to 12 months: We should check if really all
>>>>>>>> important
>>>>>>>> > decisions are mirrored to the right mailing lists, and that we
>>>>>>>> get the
>>>>>>>> > benefits we hoped for (more adoption, better experience for users
>>>>>>>> and
>>>>>>>> > developers), and that we can handle the concerns (DMs to
>>>>>>>> developers,
>>>>>>>> > indexing).
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Sat, May 7, 2022 at 12:22 PM Xintong Song <
>>>>>>>> tonysong...@gmail.com>
>>>>>>>> > wrote:
>>>>>>>> >
>>>>>>>> >> Thanks all for the valuable feedback.
>>>>>>>> >>
>>>>>>>> >> It seems most people are overall positive about using Slack for
>>>>>>>> dev
>>>>>>>> >

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-06 Thread Jingsong Li
Most of the open source communities I know have set up their slack
channels, such as Apache Iceberg [1], Apache Druid [2], etc.
So I think slack can be worth trying.

David is right, there are some cases that need to communicate back and
forth, slack communication will be more effective.

But back to the question, ultimately it's about whether there are
enough core developers willing to invest time in the slack, to
discuss, to answer questions, to communicate.
And whether there will be enough time to reply to the mailing list and
stackoverflow after we put in the slack (which we need to do).

[1] https://iceberg.apache.org/community/#slack
[2] https://druid.apache.org/community/

On Fri, May 6, 2022 at 10:06 PM David Anderson  wrote:
>
> I have mixed feelings about this.
>
> I have been rather visible on stack overflow, and as a result I get a lot of 
> DMs asking for help. I enjoy helping, but want to do it on a platform where 
> the responses can be searched and shared.
>
> It is currently the case that good questions on stack overflow frequently go 
> unanswered because no one with the necessary expertise takes the time to 
> respond. If the Flink community has the collective energy to do more user 
> outreach, more involvement on stack overflow would be a good place to start. 
> Adding slack as another way for users to request help from those who are 
> already actively providing support on the existing communication channels 
> might just lead to burnout.
>
> On the other hand, there are rather rare, but very interesting cases where 
> considerable back and forth is needed to figure out what's going on. This can 
> happen, for example, when the requirements are unusual, or when a difficult 
> to diagnose bug is involved. In these circumstances, something like slack is 
> much better suited than email or stack overflow.
>
> David
>
> On Fri, May 6, 2022 at 3:04 PM Becket Qin  wrote:
>>
>> Thanks for the proposal, Xintong.
>>
>> While I share the same concerns as those mentioned in the previous 
>> discussion thread, admittedly there are benefits of having a slack channel 
>> as a supplementary way to discuss Flink. The fact that this topic is raised 
>> once a while indicates lasting interests.
>>
>> Personally I am open to having such a slack channel. Although it has 
>> drawbacks, it serves a different purpose. I'd imagine that for people who 
>> prefer instant messaging, in absence of the slack channel, a lot of 
>> discussions might just take place offline today, which leaves no public 
>> record at all.
>>
>> One step further, if the channel is maintained by the Flink PMC, some kind 
>> of code of conduct might be necessary. I think the suggestions of ad-hoc 
>> conversations, reflecting back to the emails are good starting points. I am 
>> +1 to give it a try and see how it goes. In the worst case, we can just stop 
>> doing this and come back to where we are right now.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, May 6, 2022 at 8:55 PM Martijn Visser  wrote:
>>>
>>> Hi everyone,
>>>
>>> While I see Slack having a major downside (the results are not indexed by 
>>> external search engines, you can't link directly to Slack content unless 
>>> you've signed up), I do think that the open source space has progressed and 
>>> that Slack is considered as something that's invaluable to users. There are 
>>> other Apache programs that also run it, like Apache Airflow [1]. I also see 
>>> it as a potential option to create a more active community.
>>>
>>> A concern I can see is that users will start DMing well-known 
>>> reviewers/committers to get a review or a PR merged. That can cause a lot 
>>> of noise. I can go +1 for Slack, but then we need to establish a set of 
>>> community rules.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1] https://airflow.apache.org/community/
>>>
>>> On Fri, 6 May 2022 at 13:59, Piotr Nowojski  wrote:

 Hi Xintong,

 I'm not sure if slack is the right tool for the job. IMO it works great as
 an adhoc tool for discussion between developers, but it's not searchable
 and it's not persistent. Between devs, it works fine, as long as the result
 of the ad hoc discussions is backported to JIRA/mailing list/design doc.
 For users, that simply would be extremely difficult to achieve. In the
 result, I would be afraid we are answering the same questions over, and
 over and over again, without even a way to provide a link to the previous
 thread, because nobody can search for it .

 I'm +1 for having an open and shared slack space/channel for the
 contributors, but I think I would be -1 for such channels for the users.

 For users, I would prefer to focus more on, for example, stackoverflow.
 With upvoting, clever sorting of the answers (not the oldest/newest at top)
 it's easily searchable - those features make it fit our use case much
 better IMO.

 Best,
 Piotrek




Re: 对Flink Table Store咨询

2022-04-28 Thread Jingsong Li
哈喽,感谢你的关注

Tison是对的,Table Store是一个库。我们目前并没有发布maven依赖。

目前你可以有两种方式来调试:

1.在Table Store的工程里调试
2.在拿到flink-table-store-dist下的 pre bundled jar,放到你工程的classpath下来调试

入口类的话是通过SQL的方式:

TableEnvironment.executeSql("CREATE TABLE XX (...)");

当classpath下包含 table store 的 jar 时,会服务发现到 TableStore 的 factory,进而走到
table store 的代码。

你的需求是合理的,我们后续会考虑提供官方的maven依赖支持,且提供DataStream API

Best,
Jingsong

On Sun, Apr 24, 2022 at 10:32 AM tison  wrote:
>
> Flink Table Store 不是应用,而是库。我理解是要配合 Flink
> 来使用的,断点调试的话,看你的需求,如果只是对一段代码有疑问,跑测试打断点就行了。
>
> Best,
> tison.
>
>
> 陈卓宇 <2572805...@qq.com.invalid> 于2022年4月24日周日 09:59写道:
>
> > 您好大佬:
> >  我对Flink Table
> > Store非常感兴趣,想请教您一下怎么结合flink做断点调试,因为看了一下没有找到入口类
> >
> > 陈卓宇
> >
> >
> > 


Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-08 Thread Jingsong Li
Thanks all for your discussions.

I'll share my opinion here:

1. Hive SQL and Hive-like SQL are the absolute mainstay of current
Batch ETL in China. Hive+Spark (HiveSQL-like)+Databricks also occupies
a large market worldwide.

- Unlike OLAP SQL (such as presto, which is ansi-sql rather than hive
sql), Batch ETL is run periodically, which means that a large number
of Batch Pipelines have already been built, and if they need to be
migrated to a new system, it will be extremely costly to migrate the
SQLs.

2. Our current Hive dialect is immature and we need to put more effort
to decouple it from the flink planner.

Best,
Jingsong

On Tue, Mar 8, 2022 at 4:27 PM Zou Dan  wrote:
>
> Hi Martijn,
> Thanks for bringing this up.
> Hive SQL (using in Hive & Spark) plays an important role in batch processing, 
> it has almost become de facto standard in batch processing. In our company, 
> there are hundreds of thousands of spark jobs each day.
> IMO, if we want to promote Flink batch, Hive syntax compatibility is a 
> crucial point of it.
> Thanks to this feature, we have migrated 800+ Spark jobs to Flink smoothly.
>
> So, I quite agree with putting more effort into Hive syntax compatibility.
>
> Best,
> Dan Zou
>
> 2022年3月7日 19:23,Martijn Visser  写道:
>
> query
>
>


Re: [ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-16 Thread Jingsong Li
Not found in 
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/

I guess too many people sent versions, resulting in maven central
repository synchronization being slower.

Best,
Jingsong

On Fri, Dec 17, 2021 at 2:00 PM casel.chen  wrote:
>
> I can NOT find flink 1.13.5 related jar in maven central repository, did you 
> upload them onto there already? Thanks!
>
>
>
>
>
>
>
> At 2021-12-17 01:26:19, "Chesnay Schepler"  wrote:
> >The Apache Flink community has released emergency bugfix versions of
> >Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.
> >
> >These releases include a version upgrade for Log4j to address
> >[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and
> >[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
> >
> >We highly recommend all users to upgrade to the respective patch release.
> >
> >The releases are available for download at:
> >https://flink.apache.org/downloads.html
> >
> >Please check out the release blog post for further details:
> >https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
> >
> >
> >Regards,
> >Chesnay
>
>
>
>



-- 
Best, Jingsong Lee


Re: 回撤流优化

2021-12-16 Thread Jingsong Li
理论上mini-batch就可以优化回撤流。

目前是join没有支持mini-batch。

On Thu, Dec 16, 2021 at 5:12 PM casel.chen  wrote:
>
> 看了《Oceanus的实时流式计算实践与优化》https://jishuin.proginn.com/p/763bfbd5acbf 
> 想问一下社区是否有意实现这里说的回撤流优化功能呢?
> 实际业务很多数据是从mysql binlog cdc接入的,在回撤流上做计算是常见的场景,能否在flink sql中支持这些优化呢?



-- 
Best, Jingsong Lee


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Jingsong Li
Hi Yingjie,

Thanks for your explanation. I have no more questions. +1

On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao  wrote:
>
> Hi Jingsong,
>
> Thanks for your feedback.
>
> >>> My question is, what is the maximum parallelism a job can have with the 
> >>> default configuration? (Does this break out of the box)
>
> Yes, you are right, these two options are related to network memory and 
> framework off-heap memory. Generally, these changes will not break out of the 
> box experience, but for some extreme cases, for example, there are too many 
> ResultPartitions per task, users may need to increase network memory to avoid 
> "insufficient network buffer" error. For framework off-head, I believe that 
> user do not need to change the default value.
>
> In fact, I have a basic goal when changing these config values: when running 
> TPCDS of medium parallelism with the default value, all queries must pass 
> without any error and at the same time, the performance can be improved. I 
> think if we achieve this goal, most common use cases can be covered.
>
> Currently, for the default configuration, the exclusive buffers required at 
> input gate side is still parallelism relevant (though since 1.14, we can 
> decouple the network buffer consumption from parallelism by setting a config 
> value, it has slight performance influence on streaming jobs), which means 
> that no large parallelism can be supported by the default configuration. 
> Roughly, I would say the default value can support jobs of several hundreds 
> of parallelism.
>
> >>> I do feel that this correspondence is a bit difficult to control at the 
> >>> moment, and it would be best if a rough table could be provided.
>
> I think this is a good suggestion, we can provide those suggestions in the 
> document.
>
> Best,
> Yingjie
>
> Jingsong Li  于2021年12月14日周二 14:39写道:
>>
>> Hi  Yingjie,
>>
>> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> of batch jobs.
>>
>> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> network memory and framework.off-heap.size.
>>
>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>>
>> How much network memory and framework.off-heap.size are required for
>> how much parallelism in the default configuration?
>>
>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>> >
>> > Hi Jiangang,
>> >
>> > Thanks for your suggestion.
>> >
>> > >>> The config can affect the memory usage. Will the related memory 
>> > >>> configs be changed?
>> >
>> > I think we will not change the default network memory settings. My best 
>> > expectation is that the default value can work for most cases (though may 
>> > not the best) and for other cases, user may need to tune the memory 
>> > settings.
>> >
>> > >>> Can you share the tpcds results for different configs? Although we 
>> > >>> change the default values, it is helpful to change them for different 
>> > >>> users. In this case, the experience can help a lot.
>> >
>> > I did not keep all previous TPCDS results, but from the results, I can 
>> > tell that on HDD, always using the sort-shuffle is a good choice. For 
>> > small jobs, using sort-shuffle may not bring much performance gain, this 
>> > may because that all shuffle data can be cached in memory (page cache), 
>> > this is the case if the cluster have enough resources. However, if the 
>> > whole cluster is under heavy burden or you are running large scale jobs, 
>> > the performance of those small jobs can also be influenced. For 
>> > large-scale jobs, the configurations suggested to be tuned are 
>> > taskmanager.network.sort-shuffle.min-buffers and 
>> > taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
>> > these values for large-scale batch jobs.
>> >
>> > BTW, I am still running TPCDS tests these days and I can share these 
>> > results soon.
>> >
>> > Best,
>> > Yingjie
>> >
>> > 刘建刚  于2021年12月10日周五 18:30写道:
>> >>
>> >> Glad to 

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Jingsong Li
Hi Yingjie,

Thanks for your explanation. I have no more questions. +1

On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao  wrote:
>
> Hi Jingsong,
>
> Thanks for your feedback.
>
> >>> My question is, what is the maximum parallelism a job can have with the 
> >>> default configuration? (Does this break out of the box)
>
> Yes, you are right, these two options are related to network memory and 
> framework off-heap memory. Generally, these changes will not break out of the 
> box experience, but for some extreme cases, for example, there are too many 
> ResultPartitions per task, users may need to increase network memory to avoid 
> "insufficient network buffer" error. For framework off-head, I believe that 
> user do not need to change the default value.
>
> In fact, I have a basic goal when changing these config values: when running 
> TPCDS of medium parallelism with the default value, all queries must pass 
> without any error and at the same time, the performance can be improved. I 
> think if we achieve this goal, most common use cases can be covered.
>
> Currently, for the default configuration, the exclusive buffers required at 
> input gate side is still parallelism relevant (though since 1.14, we can 
> decouple the network buffer consumption from parallelism by setting a config 
> value, it has slight performance influence on streaming jobs), which means 
> that no large parallelism can be supported by the default configuration. 
> Roughly, I would say the default value can support jobs of several hundreds 
> of parallelism.
>
> >>> I do feel that this correspondence is a bit difficult to control at the 
> >>> moment, and it would be best if a rough table could be provided.
>
> I think this is a good suggestion, we can provide those suggestions in the 
> document.
>
> Best,
> Yingjie
>
> Jingsong Li  于2021年12月14日周二 14:39写道:
>>
>> Hi  Yingjie,
>>
>> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> of batch jobs.
>>
>> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> network memory and framework.off-heap.size.
>>
>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>>
>> How much network memory and framework.off-heap.size are required for
>> how much parallelism in the default configuration?
>>
>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>> >
>> > Hi Jiangang,
>> >
>> > Thanks for your suggestion.
>> >
>> > >>> The config can affect the memory usage. Will the related memory 
>> > >>> configs be changed?
>> >
>> > I think we will not change the default network memory settings. My best 
>> > expectation is that the default value can work for most cases (though may 
>> > not the best) and for other cases, user may need to tune the memory 
>> > settings.
>> >
>> > >>> Can you share the tpcds results for different configs? Although we 
>> > >>> change the default values, it is helpful to change them for different 
>> > >>> users. In this case, the experience can help a lot.
>> >
>> > I did not keep all previous TPCDS results, but from the results, I can 
>> > tell that on HDD, always using the sort-shuffle is a good choice. For 
>> > small jobs, using sort-shuffle may not bring much performance gain, this 
>> > may because that all shuffle data can be cached in memory (page cache), 
>> > this is the case if the cluster have enough resources. However, if the 
>> > whole cluster is under heavy burden or you are running large scale jobs, 
>> > the performance of those small jobs can also be influenced. For 
>> > large-scale jobs, the configurations suggested to be tuned are 
>> > taskmanager.network.sort-shuffle.min-buffers and 
>> > taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
>> > these values for large-scale batch jobs.
>> >
>> > BTW, I am still running TPCDS tests these days and I can share these 
>> > results soon.
>> >
>> > Best,
>> > Yingjie
>> >
>> > 刘建刚  于2021年12月10日周五 18:30写道:
>> >>
>> >> Glad to 

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Jingsong Li
Hi  Yingjie,

+1 for this FLIP. I'm pretty sure this will greatly improve the ease
of batch jobs.

Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
and "taskmanager.network.sort-shuffle.min-buffers" are related to
network memory and framework.off-heap.size.

My question is, what is the maximum parallelism a job can have with
the default configuration? (Does this break out of the box)

How much network memory and framework.off-heap.size are required for
how much parallelism in the default configuration?

I do feel that this correspondence is a bit difficult to control at
the moment, and it would be best if a rough table could be provided.

Best,
Jingsong

On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>
> Hi Jiangang,
>
> Thanks for your suggestion.
>
> >>> The config can affect the memory usage. Will the related memory configs 
> >>> be changed?
>
> I think we will not change the default network memory settings. My best 
> expectation is that the default value can work for most cases (though may not 
> the best) and for other cases, user may need to tune the memory settings.
>
> >>> Can you share the tpcds results for different configs? Although we change 
> >>> the default values, it is helpful to change them for different users. In 
> >>> this case, the experience can help a lot.
>
> I did not keep all previous TPCDS results, but from the results, I can tell 
> that on HDD, always using the sort-shuffle is a good choice. For small jobs, 
> using sort-shuffle may not bring much performance gain, this may because that 
> all shuffle data can be cached in memory (page cache), this is the case if 
> the cluster have enough resources. However, if the whole cluster is under 
> heavy burden or you are running large scale jobs, the performance of those 
> small jobs can also be influenced. For large-scale jobs, the configurations 
> suggested to be tuned are taskmanager.network.sort-shuffle.min-buffers and 
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
> these values for large-scale batch jobs.
>
> BTW, I am still running TPCDS tests these days and I can share these results 
> soon.
>
> Best,
> Yingjie
>
> 刘建刚  于2021年12月10日周五 18:30写道:
>>
>> Glad to see the suggestion. In our test, we found that small jobs with the 
>> changing configs can not improve the performance much just as your test. I 
>> have some suggestions:
>>
>> The config can affect the memory usage. Will the related memory configs be 
>> changed?
>> Can you share the tpcds results for different configs? Although we change 
>> the default values, it is helpful to change them for different users. In 
>> this case, the experience can help a lot.
>>
>> Best,
>> Liu Jiangang
>>
>> Yun Gao  于2021年12月10日周五 17:20写道:
>>>
>>> Hi Yingjie,
>>>
>>> Very thanks for drafting the FLIP and initiating the discussion!
>>>
>>> May I have a double confirmation for 
>>> taskmanager.network.sort-shuffle.min-parallelism that
>>> since other frameworks like Spark have used sort-based shuffle for all the 
>>> cases, does our
>>> current circumstance still have difference with them?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>>
>>> --
>>> From:Yingjie Cao 
>>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>> To:dev ; user ; user-zh 
>>> 
>>> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>>>
>>> Hi dev & users:
>>>
>>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>>
>>> Best,
>>> Yingjie
>>>
>>> [1] 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>>
>>> Hi dev & users,
>>>
>>> We propose to change some default values of blocking shuffle to improve the 
>>> user out-of-box experience (not influence streaming). The default values we 
>>> want to change are as follows:
>>>
>>> 1. Data compression 
>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the 
>>> default value is 'false'.  Usually, data compression can reduce both disk 
>>> and network IO which is good for performance. At the same time, it can save 
>>> storage space. We propose to change the default value to true.
>>>
>>> 2. Default shuffle implementation 
>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default 
>>> value is 'Integer.MAX', which means by default, Flink jobs will always use 
>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for 
>>> both stability and performance. So we propose to reduce the default value 
>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and 
>>> 1024 with a tpc-ds and 128 is the best one.)
>>>
>>> 3. Read buffer of sort-shuffle 
>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the 
>>> default value is '32M'. Previously, when choosing the default value, both 
>>> ‘32M' and '64M' 

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Jingsong Li
Hi  Yingjie,

+1 for this FLIP. I'm pretty sure this will greatly improve the ease
of batch jobs.

Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
and "taskmanager.network.sort-shuffle.min-buffers" are related to
network memory and framework.off-heap.size.

My question is, what is the maximum parallelism a job can have with
the default configuration? (Does this break out of the box)

How much network memory and framework.off-heap.size are required for
how much parallelism in the default configuration?

I do feel that this correspondence is a bit difficult to control at
the moment, and it would be best if a rough table could be provided.

Best,
Jingsong

On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao  wrote:
>
> Hi Jiangang,
>
> Thanks for your suggestion.
>
> >>> The config can affect the memory usage. Will the related memory configs 
> >>> be changed?
>
> I think we will not change the default network memory settings. My best 
> expectation is that the default value can work for most cases (though may not 
> the best) and for other cases, user may need to tune the memory settings.
>
> >>> Can you share the tpcds results for different configs? Although we change 
> >>> the default values, it is helpful to change them for different users. In 
> >>> this case, the experience can help a lot.
>
> I did not keep all previous TPCDS results, but from the results, I can tell 
> that on HDD, always using the sort-shuffle is a good choice. For small jobs, 
> using sort-shuffle may not bring much performance gain, this may because that 
> all shuffle data can be cached in memory (page cache), this is the case if 
> the cluster have enough resources. However, if the whole cluster is under 
> heavy burden or you are running large scale jobs, the performance of those 
> small jobs can also be influenced. For large-scale jobs, the configurations 
> suggested to be tuned are taskmanager.network.sort-shuffle.min-buffers and 
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase 
> these values for large-scale batch jobs.
>
> BTW, I am still running TPCDS tests these days and I can share these results 
> soon.
>
> Best,
> Yingjie
>
> 刘建刚  于2021年12月10日周五 18:30写道:
>>
>> Glad to see the suggestion. In our test, we found that small jobs with the 
>> changing configs can not improve the performance much just as your test. I 
>> have some suggestions:
>>
>> The config can affect the memory usage. Will the related memory configs be 
>> changed?
>> Can you share the tpcds results for different configs? Although we change 
>> the default values, it is helpful to change them for different users. In 
>> this case, the experience can help a lot.
>>
>> Best,
>> Liu Jiangang
>>
>> Yun Gao  于2021年12月10日周五 17:20写道:
>>>
>>> Hi Yingjie,
>>>
>>> Very thanks for drafting the FLIP and initiating the discussion!
>>>
>>> May I have a double confirmation for 
>>> taskmanager.network.sort-shuffle.min-parallelism that
>>> since other frameworks like Spark have used sort-based shuffle for all the 
>>> cases, does our
>>> current circumstance still have difference with them?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>>
>>> --
>>> From:Yingjie Cao 
>>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>> To:dev ; user ; user-zh 
>>> 
>>> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>>>
>>> Hi dev & users:
>>>
>>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>>
>>> Best,
>>> Yingjie
>>>
>>> [1] 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>>
>>> Hi dev & users,
>>>
>>> We propose to change some default values of blocking shuffle to improve the 
>>> user out-of-box experience (not influence streaming). The default values we 
>>> want to change are as follows:
>>>
>>> 1. Data compression 
>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the 
>>> default value is 'false'.  Usually, data compression can reduce both disk 
>>> and network IO which is good for performance. At the same time, it can save 
>>> storage space. We propose to change the default value to true.
>>>
>>> 2. Default shuffle implementation 
>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default 
>>> value is 'Integer.MAX', which means by default, Flink jobs will always use 
>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for 
>>> both stability and performance. So we propose to reduce the default value 
>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and 
>>> 1024 with a tpc-ds and 128 is the best one.)
>>>
>>> 3. Read buffer of sort-shuffle 
>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the 
>>> default value is '32M'. Previously, when choosing the default value, both 
>>> ‘32M' and '64M' 

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


Re: Flinksql 多表进行full join 出现异常

2021-11-11 Thread Jingsong Li
Hi,

不好意思,不会cherry-pick到1.12了,因为这是个feature,在1.14及其之后的版本支持

Best,
Jingsong

On Fri, Nov 12, 2021 at 3:06 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:
>
> 社区您好 我通过代码debug已经定位到问题:
>
>
> 在flink1.12.5版本下flink-orc_2.11模块下的org/apache/flink/orc/vector/AbstractOrcColumnVector.java文件
> 下createFlinkVector中是没有对ListColumnVector进行实现的,我到flink的master上看在2021/5/12由wangwei1025提交的pr进行了实现,现在想请问社区有没有打算对1.12.5版本的此次问题根据wangwei1025的提交进行补丁的修复
>
>
> 表字段:
>
>
>
>
>string_tag   string
>
>
>
>
>number_tag   number
>
>
>
>
>boolean_tag   boolean
>
>
>
>
>datetime_tag   datetime
>
>
>
>
>arr_tag   array
> 字段这里我进行了转换,生成这个SQL ,我发现具有array SQL:CREATE TABLE smarttag_base_table_5 (
>  distinct_id BIGINT,
>  xwho VARCHAR,
> string_tag string,
> number_tag decimal,
> boolean_tag integer,
> datetime_tag bigint,
> arr_tag ARRAY ds INTEGER
> ) WITH (
>  'connector' = 
> 'filesystem',  -- 必选: 
> 指定连接器类型
>  'path' = 
> 'hdfs://ark1:8020/tmp/usertag/20211029/db_31abd9593e9983ec/orcfile/smarttag_base_table_5/',
>  -- 必选: 指向目录的路径
>  'format' = 
> 'orc'
>   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节
> )
>
>
>
>
>
> 报错:Unsupport vector: org.apache.hadoop.hive.ql.exec.vector.ListColumnVector
> 我看是因为array source到hdfs的一张orc的表
>
> 陈卓宇
>
>
> 
>
>
>
>
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年11月12日(星期五) 上午10:59
> 收件人:"flink中文邮件组"
> 主题:Re: Flinksql 多表进行full join 出现异常
>
>
>
> Hi!
>
> 感谢反馈问题。这看起来其实和 join 无关,应该是与 source 有关。方便的话,能否把 source 表的
> ddl(包含每个字段的类型,字段名如果敏感可以重命名一下)和其他信息(例如 source 表以什么格式存储)分享在邮件里?
>
> 陈卓宇 <2572805...@qq.com.invalid 于2021年11月11日周四 下午9:44写道:
>
>  场景:进行多表的full join失败
> 
> 
>  报错:
>  java.lang.RuntimeException: Failed to fetch next result
> 
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
>  nbsp;nbsp; nbsp;at
>  
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
>  nbsp;nbsp; nbsp;at 
> TableAPI.envReadFileSysteam(TableAPI.java:441)
>  nbsp;nbsp; nbsp;at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>  nbsp;nbsp; nbsp;at
>  
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  nbsp;nbsp; nbsp;at
>  
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  nbsp;nbsp; nbsp;at 
> java.lang.reflect.Method.invoke(Method.java:498)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  nbsp;nbsp; nbsp;at
>  
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  nbsp;nbsp; nbsp;at
>  org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>  nbsp;nbsp; nbsp;at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  nbsp;nbsp; nbsp;at
>  org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  

Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-10 Thread Jingsong Li
Thanks!

+1 to pattern

Best,
Jingsong

On Wed, Nov 10, 2021 at 7:52 PM yidan zhao  wrote:
>
> 我在jira回复了下,我感觉还是能配置化好一些,那个liwei貌似现在加了个basicDate这个太单一了。
>
> Jingsong Li  于2021年11月4日周四 下午12:18写道:
>
> > 你可以自定义个partition.time-extractor.class来自己解析
> >
> > Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。
> > 建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758
> >
> > Best,
> > Jingsong
> >
> > On Thu, Nov 4, 2021 at 11:47 AM yidan zhao  wrote:
> > >
> > > 如题,我当前是select date_format(xxx, 'MMdd') as dt...
> > >
> > > partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。
> > >
> > > 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >



-- 
Best, Jingsong Lee


Re: "sink.partition-commit.success-file.name" option in the FileSystem connector does not work

2021-11-07 Thread Jingsong Li
Hi,

yidan is correct.

The success-file is not the data-file. [1]

At present, there is no configuration with data file name. You can
create a JIRA for this.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy

Best,
Jingsong

On Mon, Nov 8, 2021 at 9:59 AM yidan zhao  wrote:
>
> Actually, the success file is another file which is written done to the dir 
> when the partition is done. It's content have nothing to do with your 
> bussiness.
>
> Long Nguyễn  于2021年11月5日周五 下午5:07写道:
>>
>> Thank you, Paul.
>>
>> The answer is so clear and helpful. But I'm still wondering what is the 
>> purpose of the sink.partition-commit.success-file.name option if the sink 
>> files must be named in that specific way.
>>
>> On Fri, Nov 5, 2021 at 3:23 PM Fabian Paul  wrote:
>>>
>>> Hi,
>>>
>>> Currently this is expected because the FileSink is built to support running 
>>> with higher parallelism. Therefore it needs to periodically write files. The
>>> respective file names always have a descriptor that the File Sink knows 
>>> which files have already been written. You can read more about the FileSink
>>> here [1].
>>>
>>> Best,
>>> Fabian
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/
>>
>>
>>
>> --
>> --
>> Nguyen Dich Long,
>> School of Information and Communication Technology (SoICT - 
>> https://www.soict.hust.edu.vn)
>> Hanoi University of Science and Technology (https://www.hust.edu.vn)
>> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha Noi, 
>> Vietnam
>> Tel: +84 (0)3.54.41.76.76
>> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com



-- 
Best, Jingsong Lee


Re: flinkSQL写hive表,timestamp-pattern设置,分区是yyyyMMdd而不是yyyy-MM-dd的情况怎么搞。

2021-11-03 Thread Jingsong Li
你可以自定义个partition.time-extractor.class来自己解析

Flink应该搞个对应的partition.time-extractor.kind来默认支持你的需求。
建了个JIRA: https://issues.apache.org/jira/browse/FLINK-24758

Best,
Jingsong

On Thu, Nov 4, 2021 at 11:47 AM yidan zhao  wrote:
>
> 如题,我当前是select date_format(xxx, 'MMdd') as dt...
>
> partition.time-extractor.timestamp-pattern是$dt $hour:00:00这样。
>
> 但是这样会导致报错,貌似这个地方必须是 -MM-dd hh:mm:ss这种吗。



-- 
Best, Jingsong Lee


Re: Question about flink sql

2021-11-01 Thread Jingsong Li
Hi,

If you are using sql-client, you can try:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
If you are using TableEnvironment, you can try statement set too:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#translate-and-execute-a-query

Best,
Jingsong

On Fri, Oct 29, 2021 at 7:01 PM Jake  wrote:
>
> Hi
>
> You can use like this:
>
> ```java
>
> val calciteParser = new 
> CalciteParser(SqlUtil.getSqlParserConfig(tableEnv.getConfig))
> sqlArr
> .foreach(item => {
> println(item)
> val itemNode = calciteParser.parse(item)
>
> itemNode match {
> case sqlSet: SqlSet => {
> configuration.setString(sqlSet.getKeyString, 
> sqlSet.getValueString)
> }
> case _: RichSqlInsert => insertSqlBuffer += item
> case _ => {
> println(item)
> val itemResult = tableEnv.executeSql(item)
> itemResult.print()
> }
> }
> })
>
> // execute batch inserts
> if (insertSqlBuffer.size > 0) {
> insertSqlBuffer.foreach(item => {
> println("insert sql: " + item)
> statementSet.addInsertSql(item)
> })
> val explain = statementSet.explain()
> println(explain)
> statementSet.execute()
> }
>
>
>
> ```
>
>
> On Oct 29, 2021, at 18:50, wx liao  wrote:
>
> Hi:
> I use flink sql,and run a script that has one souce an two sink,I can see 2 
> jobs runing through webUI,is that normal?
> Can some way to ensure only run on job that has one source and two sink? 
> Thank you
>
>


-- 
Best, Jingsong Lee


Re: flink1.12.5 hivecatalog error

2021-11-01 Thread Jingsong Li
Hi,

It seems that there is a jar conflict. You can check your
dependencies. Some guava dependencies conflict with the corresponding
Hadoop version.

You can try to exclude all guava dependencies.

Best,
Jingsong

On Mon, Nov 1, 2021 at 6:07 PM 方汉云  wrote:
>
> Hive version2.3.8
> Flink version 1.12.5
>
> error message:
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
> not create execution context.
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:972)
> at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:225)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.lang.NoSuchMethodError: 
> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
> at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518)
> at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536)
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:430)
> at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4045)
> at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:4013)
> at 
> org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:230)
> at 
> org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:169)
> at 
> org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:97)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:396)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:684)
> at java.util.HashMap.forEach(HashMap.java:1289)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:681)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:265)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:677)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:565)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:187)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:961)
> 重要声明:此邮件中包含的信息为特许和保密信息,只能用于上述收件人以及其他已获得接收授权的收件人。如果您不是此邮件的预期收件人,请勿阅读、复制、转发或存储此邮件。如果已误收此邮件,请将其转发到发件人,并从您的计算机系统彻底删除此邮件。感谢您。



-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-21 Thread Jingsong Li
Thanks, Chesnay & Martijn

1.13.3 really solves many problems.

Best,
Jingsong

On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf  wrote:
>
> Thank you, Chesnay & Martijn, for managing this release!
>
> On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler 
> wrote:
>
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.13.3, which is the third bugfix release for the Apache
> > Flink 1.13 series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > https://flink.apache.org/news/2021/10/19/release-1.13.3.html
> >
> > The full release notes are available in Jira:
> >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350329
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Regards,
> > Chesnay
> >
> >
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk



-- 
Best, Jingsong Lee


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Jingsong Li
Hi Yik,

The **batch** Hive sink does not support `sink.partition-commit.policy.kind`.

Default **batch** Hive sink will commit metastore without success-file.

You can create a JIRA for this.

Best,
Jingsong

On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng  wrote:
>
> Hi!
>
> As far as I know Flink batch jobs will not add the _SUCCESS file. However for 
> batch jobs you can register a JobListener and add the _SUCCESS file by 
> yourself in JobListener#onJobExecuted. See registerJobListener method in 
> StreamExecutionEnvironment.
>
> Yik San Chan  于2021年8月20日周五 上午10:26写道:
>>
>> Hi community,
>>
>> According to the 
>> [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
>>  if I create a Hive table with config 
>> sink.partition-commit.policy.kind="metastore,success-file", once the write 
>> to the **streaming** Hive sink is finished:
>>
>> - The HDFS directory will be registered to the Hive metastore,
>> - There will be a _SUCCESS file written to the directory when the job 
>> finishes.
>>
>> An example result directory on HDFS looks like this:
>>
>> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
>> Found 9 items
>> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
>> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
>> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
>> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
>> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
>> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
>> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
>> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
>> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>>
>> There are 8 part-* files because I set the flink run parallelism to 8. After 
>> all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, 
>> which is later than all the rest).
>>
>> I wonder: can I do the same with **batch** Hive sink as well? Ideally, after 
>> the job finishes, I would like to have a _SUCCESS file added to the 
>> directory. However, I haven't figured out how to do it yet.
>>
>> Any help? Thanks!
>>
>> Best,
>> Yik San



-- 
Best, Jingsong Lee


Re: Apache/flink 1.13.1 images re-pushed and in arm64 arch?

2021-08-11 Thread Jingsong Li
Hi Oran and Chesnay,

I think it should be my problem. The docker image I generated on the
computer with the macbook M1 will lead to the image of arm64 (When
releasing 1.12.5).

We will regenerate the image of 1.13.1 on the Intel x86 machine.

I'm very sorry.

Best,
Jingsong

On Tue, Aug 10, 2021 at 10:25 PM Chesnay Schepler  wrote:
>
> Ah, yes I was looking at the wrong one. I'll figure out what happened
> and get the situation fixed.
>
> Overall though, I'd recommend to rely on https://hub.docker.com/_/flink.
>
> On 10/08/2021 16:04, Oran Shuster wrote:
> > Are you sure you are looking at the correct repo?
> > flink has 2 docker repos for it's images - flink and apache/flink
> > for flink, everything is fine 
> > https://hub.docker.com/_/flink?tab=tags=1=last_updated=1.13.1
> > But for apache/flink (which is still official, and updated faster then the 
> > "regular" flink one), the 1.13.1 images are still arm64 
> > https://hub.docker.com/r/apache/flink/tags?page=1=last_updated=1.13.1
> >
> > On 2021/08/10 07:13:21, Chesnay Schepler  wrote:
> >> Looking at the dockerhub page know, all 1.13.1 images are listed as
> >> amd64, with the last push being 5 days ago.
> >> Perhaps some error happened during the rebuild, which has since been
> >> corrected?
> >>
> >> On 10/08/2021 00:43, Oran Shuster wrote:
> >>> Our 1.13.1 k8s deployments started failing with
> >>> tandard_init_linux.go:228: exec user process caused: exec format error
> >>> Not understanding how an old image with an old depoyment starts failing 
> >>> like that i looked at the apache/flink dockerhub page.
> >>> Although 1.13.1 was released more than a month ago, all the images (apart 
> >>> from 1.13.1-scala_2.11-java11) were pushed 6 days ago and their 
> >>> architecture changed from linux/amd64 to
> >>> linux/arm64
> >>>
> >>> Luckily the "official" flink images (that were also pushed 5-6 days ago) 
> >>> are still amd64 and can be used
> >>>
> >>
>


-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-10 Thread Jingsong Li
Thanks Yun Tang and everyone!

Best,
Jingsong

On Tue, Aug 10, 2021 at 9:37 AM Xintong Song  wrote:

> Thanks Yun and everyone~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann 
> wrote:
>
> > Thanks Yun Tang for being our release manager and the great work! Also
> > thanks a lot to everyone who contributed to this release.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
> >
> >> Thanks Yun Tang for being our release manager and everyone else who made
> >> the release possible!
> >>
> >> Best Regards,
> >> Yu
> >>
> >>
> >> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
> >>
> >>>
> >>> The Apache Flink community is very happy to announce the release of
> >>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
> >>> Flink 1.13 series.
> >>>
> >>> Apache Flink® is an open-source stream processing framework for
> >>> distributed, high-performing, always-available, and accurate data
> streaming
> >>> applications.
> >>>
> >>> The release is available for download at:
> >>> https://flink.apache.org/downloads.html
> >>>
> >>> Please check out the release blog post for an overview of the
> >>> improvements for this bugfix release:
> >>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
> >>>
> >>> The full release notes are available in Jira:
> >>>
> >>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
> >>>
> >>> We would like to thank all contributors of the Apache Flink community
> >>> who made this release possible!
> >>>
> >>> Regards,
> >>> Yun Tang
> >>>
> >>
>


-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-10 Thread Jingsong Li
Thanks Yun Tang and everyone!

Best,
Jingsong

On Tue, Aug 10, 2021 at 9:37 AM Xintong Song  wrote:

> Thanks Yun and everyone~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 9, 2021 at 10:14 PM Till Rohrmann 
> wrote:
>
> > Thanks Yun Tang for being our release manager and the great work! Also
> > thanks a lot to everyone who contributed to this release.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:
> >
> >> Thanks Yun Tang for being our release manager and everyone else who made
> >> the release possible!
> >>
> >> Best Regards,
> >> Yu
> >>
> >>
> >> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
> >>
> >>>
> >>> The Apache Flink community is very happy to announce the release of
> >>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
> >>> Flink 1.13 series.
> >>>
> >>> Apache Flink® is an open-source stream processing framework for
> >>> distributed, high-performing, always-available, and accurate data
> streaming
> >>> applications.
> >>>
> >>> The release is available for download at:
> >>> https://flink.apache.org/downloads.html
> >>>
> >>> Please check out the release blog post for an overview of the
> >>> improvements for this bugfix release:
> >>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
> >>>
> >>> The full release notes are available in Jira:
> >>>
> >>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
> >>>
> >>> We would like to thank all contributors of the Apache Flink community
> >>> who made this release possible!
> >>>
> >>> Regards,
> >>> Yun Tang
> >>>
> >>
>


-- 
Best, Jingsong Lee


Re: Re: filesystem connector不支持跨subtask合并小文件

2021-08-05 Thread Jingsong Li
这个参数的意思是合并后的文件最大的size,你每个文件1k多,两个文件就大于2k了,所以没有触发合并

On Fri, Aug 6, 2021 at 11:59 AM Rui Li  wrote:

> 可以把这个参数调大点试试呢,调成远大于单个文件的size
>
> On Thu, Aug 5, 2021 at 1:43 PM lixin58...@163.com 
> wrote:
>
> > 你好,
> > 生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大
> >
> > --
> > lixin58...@163.com
> >
> >
> > *发件人:* Rui Li 
> > *发送时间:* 2021-08-05 11:42
> > *收件人:* user-zh 
> > *主题:* Re: filesystem connector不支持跨subtask合并小文件
> > 你好,
> >
> > 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么
> >
> > On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com 
> > wrote:
> >
> > > 你好,
> > > 在使用filesystem
> > >
> >
> connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?
> > >
> > > create table fs_parquet_compact
> > > (userid bigint, name string, part string)
> > > PARTITIONED BY (part)
> > > with(
> > > 'connector' = 'filesystem',
> > > 'path' = 'hdfs:///data/fs_parquet_compact',
> > > 'format' = 'parquet',
> > > 'auto-compaction' = 'true',
> > > 'compaction.file-size' = '2kb',
> > > 'sink.rolling-policy.file-size' = '500b',
> > > 'sink.rolling-policy.rollover-interval' = '800s',
> > > 'sink.rolling-policy.check-interval' = '60s'
> > > );
> > >
> > >
> > >
> > > lixin58...@163.com
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
> >
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: 1.14啥时候出呀

2021-08-04 Thread Jingsong Li
1.14还有1-2个月
1.13.2马上就出了,估计明天或后天或周一

On Wed, Aug 4, 2021 at 4:48 PM yidan zhao  wrote:

> 如题,1.14或1.13.2啥时候出呀,有人知道吗。
>


-- 
Best, Jingsong Lee


Re: Re: filesystem table parquet 滚动问题

2021-07-26 Thread Jingsong Li
是的,类似的

On Tue, Jul 27, 2021 at 10:42 AM lixin58...@163.com 
wrote:

> 你好,
>
> 感谢回复,看了下这个文档,提到对于parquet这种列式文件只会使用onCheckpointRollPolicy,也就是只在做检查点时会滚动。flink
> filesystem table这块的parquet列式文件写入是不是也这样呢?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/streamfile_sink.html
>
>
>
>
> lixin58...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2021-07-27 10:30
> 收件人: user-zh
> 主题: Re: filesystem table parquet 滚动问题
> parquet因为它会在内存中攒buffer,所以文件的file-size并不能很精确。。只能等它flush了才会生效。
>
> On Sun, Jul 25, 2021 at 9:47 AM lixin58...@163.com 
> wrote:
>
> > 大家好,
> >
> > 检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2
> >
> >
> >
> 不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说
> >
> > 求大佬们解惑!
> >
> > create table fs_parquet
> > (userid bigint, name string, part string)
> > PARTITIONED BY (part)
> > with(
> > 'connector' = 'filesystem',
> > 'path' = 'hdfs:///data/fs_parquet',
> > 'format' = 'parquet',
> > 'sink.rolling-policy.file-size' = '1kb',
> > 'sink.rolling-policy.rollover-interval' = '800s',
> > 'sink.rolling-policy.check-interval' = '60s'
> > );
> >
> >
> >
> >
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: filesystem table parquet 滚动问题

2021-07-26 Thread Jingsong Li
parquet因为它会在内存中攒buffer,所以文件的file-size并不能很精确。。只能等它flush了才会生效。

On Sun, Jul 25, 2021 at 9:47 AM lixin58...@163.com 
wrote:

> 大家好,
>
> 检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2
>
>
> 不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说
>
> 求大佬们解惑!
>
> create table fs_parquet
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet',
> 'format' = 'parquet',
> 'sink.rolling-policy.file-size' = '1kb',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
>

-- 
Best, Jingsong Lee


Re: 参与开发社区

2021-07-16 Thread Jingsong Li
Flink-Hudi版本是啥?
Flink集群版本是啥?
精确到第三位版本号

On Thu, Jul 15, 2021 at 11:39 PM Page  wrote:

> 能不能把依赖和相关代码贴一下
>
>
> | |
> Page
> |
> |
> lj879933...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年7月13日 18:36,沉黙dē羔羊<736036...@qq.com.INVALID> 写道:
> 大家好,请教下,用了flink hudi 写入数据报错,如下:
> Caused by: org.apache.flink.util.FlinkException: Error from
> OperatorCoordinator
> ... 41 more
> Caused by: java.lang.AbstractMethodError: Method
> org/apache/hudi/sink/StreamWriteOperatorCoordinator.subtaskReady(ILorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$SubtaskGateway;)V
> is abstract
> at
> org.apache.hudi.sink.StreamWriteOperatorCoordinator.subtaskReady(StreamWriteOperatorCoordinator.java)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.notifySubtaskReady(OperatorCoordinatorHolder.java:416)
> ... 40 more



-- 
Best, Jingsong Lee


Re: flink大窗口性能问题

2021-07-14 Thread Jingsong Li
没用rocksdb吗?

On Thu, Jul 15, 2021 at 10:46 AM Michael Ran  wrote:

> 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少
> 在 2021-07-15 10:23:25,"Hui Wang" <463329...@qq.com.INVALID> 写道:
> >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优
>


-- 
Best, Jingsong Lee


Re: 退订

2021-06-28 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Mon, Jun 28, 2021 at 5:56 PM luoye <13033709...@163.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: Flink Sql 1.13 UDF ERROR

2021-06-28 Thread Jingsong Li
Hi,

你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题

Best,
Jingsong

On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:

>
>
> Hi, All.
>
>
> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>
>
> 版本: 1.13.1
> 运行模式: IDE-application
> ---
> about udf define...
>
>
> public static class UDFAggregateFunction extends
> AggregateFunction {
>
>
> //返回最终结果
> @Override
> public Double getValue(AccumulatorBean acc) {
> return acc.totalPrice / acc.totalNum;
> }
>
>
> //构建保存中间结果的对象
> @Override
> public AccumulatorBean createAccumulator() {
> return new AccumulatorBean();
> }
>
>
> //减去要撤回的值
> public void retract(AccumulatorBean acc, double price, long num) {
> acc.totalPrice -= price * num;
> acc.totalNum -= num;
> }
>
>
> //从每个分区把数据取出来然后合并
> public void merge(AccumulatorBean acc, Iterable
> it) {
>
>
> Iterator iter = it.iterator();
> while (iter.hasNext()) {
> AccumulatorBean a = iter.next();
> this.accumulate(acc, a.totalPrice, a.totalNum);
> }
> }
>
>
> //重置内存中值时调用
> public void resetAccumulator(AccumulatorBean acc) {
> acc.totalNum = 0;
> acc.totalPrice = 0;
> }
>
>
> //和传入数据进行计算的逻辑
> public void accumulate(AccumulatorBean acc, double price, long
> num) {
> acc.totalPrice += price * num;
> acc.totalNum += num;
> }
> }
>
>
>
> 
> About main calling
> //TODO 流批一体的 Table API
> TableEnvironment tableEnvironment =
> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
> List dataList = new ArrayList<>();
> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
> Table table = tableEnvironment.fromValues(DataTypes.ROW(
> DataTypes.FIELD("user", DataTypes.STRING()),
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("price", DataTypes.DOUBLE()),
> DataTypes.FIELD("num", DataTypes.BIGINT())
> ),
> dataList);
> tableEnvironment.createTemporaryView("orders", table);
>
>
> tableEnvironment.createTemporaryFunction("c_agg", new
> UDFAggregateFunction());
>
>
> tableEnvironment.executeSql("select user, c_agg(price, num) as
> udf_field from orders group by user").print();
>
>
>
>
>
>
>
> 异常堆栈-
>
>
>
>
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at
> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> function call:
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> 

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Jingsong Li
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

-- 
Best, Jingsong Lee


Re: Flink TPC-DS 3TB BenchMark result is not good.

2021-06-23 Thread Jingsong Li
Thanks Yingjie for pinging me.

Hi vtygoss,

Leonard is right, maybe you are using the wrong statistics information.

This caused the optimizer to select the **BROADCAST JOIN**
incorrectly. Unfortunately, Flink needs to broadcast a huge amount of data,
even gigabytes. This is really the performance killer.

I think you can:
- analyze your tables in Hive as Leonard said.
- Or just remove "TpcdsStatsProvider.registerTpcdsStats(tEnv)"

And I see your code: "table.optimizer.join.broadcast-threshold: 256 * 1024
* 1024".

I think this threshold is too large. More than 10MB is not recommended.

Best,
Jingsong

On Wed, Jun 23, 2021 at 11:08 AM Leonard Xu  wrote:

> Hi, vtygoss
>
> Thanks for the detail report,  a quick reply as I wrote the
> * org.apache.flink.table.tpcds.TpcdsTestProgram* in community, I guess
> you missed *table statistics information.*
>
> The* table statistics information* used in the TPC-DS e2e tests is
> constant for 1GB verification data set, I wrote this test for checking
> Flink Batch SQL works well for every PR as CI test rather than checking the
> performance. Please see
> *org.apache.flink.table.tpcds.stats.TpcdsStatsProvider*.
>
>  The *table statistics information *will be used by planner(CBO
> optimizer) to optimize the  sql plan, the incorrect  *table statistics
> information *even lead to the wrong plan and sql job may run unexpectedly.
>
> Thus if you want to run for 3TB TPC-DS tests, you should use the
> corresponding  *table statistics information *for your test data set, you
> can obtain the table statistics information by analyze your tables in Hive.
>
> Best,
> Leonard
>
> 在 2021年6月23日,10:42,Yingjie Cao  写道:
>
> Hi,
>
> I also have some experience of running TPC-DS benchmark with Flink (10T
> scale). But the data shuffle amount of Q1 has a really big difference with
> the numbers in the picture you shared. I am not sure what is going on,
> maybe you missed something? I attached the numbers of Q1 in my test (also
> with 500 max parallelism, though I used Flink version 1.13 instead of
> 1.12), the running time is 20s for 10T TPC-DS.
>
> There are some points I known which may influence the test results, hope
> these can help you:
> 1. Shuffle data compression. (Disabled in Flink by default, can be enabled
> by setting taskmanager.network.blocking-shuffle.compression.enabled to
> true);
> 2. Blocking shuffle type used. See [1] for more information. (To used
> sort-shuffle, the minimum version is 1.13);
> 3. Memory configuration, including network and managed memory size.
> 4. Sql optimization configuration. (I am not familiar with this,
> maybe @Jingsong Li has more knowledge about that).
>
> BTW, could you please share more information? For example, how many nodes
> in your cluster? Which type of disk do you use, SSD or HDD? How many
> available cores, disks and memory of each node? Could you also share the
> numbers of shuffle write and shuffle read of all stages of Spark?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>
> 
>
> Best,
> Yingjie
>
> vtygoss  于2021年6月22日周二 下午4:46写道:
>
>> Hi,
>>
>> I am doing performance tests on 3TB TPC-DS using
>> flink/flink-end-to-end-tests/flink-tpcds-test module, but the test results
>> are not good.
>>
>> scenario:
>> tpc-ds location: hive 2.3.5
>> tpc-ds scala: 3TB, parquet + snappy
>> flink version: flink-1.12-SNAPSHOT
>> resource configuration: slots per task manager=5, parallesm=500, job
>> manager memory=10GB, task manager memory=10GB, task manager number=100.
>>
>> difference of my branch and community branch:
>> 1. tpc-ds stored in hive, so i change Source from CSV to Hive.
>> 2. I add some optimizations explicitly about  join-reorder,broadcast…
>> shown as below
>> 3. community  tpcds test main class is org.apache.flink.table.tpcds.
>> TpcdsTestProgram, my is org.apache.flink.table.tpcds.My
>> <http://org.apache.flink.table.tpcds.my/>TpcdsTestProgram. Both are in
>> the attachment.
>>
>> Do I make something wrong? please help to offer some advices. thanks very
>> much!
>>
>> Best Regards
>>
>>
>> ```
>> [MyTpcdsTestProgram.java] (My Branch)
>>
>> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
>> Boolean useTableStats) {
>> // init Table Env
>> EnvironmentSettings environmentSettings =
>> 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);

Re: flink-1.13.1 sql error

2021-06-20 Thread Jingsong Li
你这个报错信息,意思是有个字段,你在DDL里声明是ROW,但是真实数据的这列却不是一个ROW。

你应该排查下是否声明对了,如果在期望中,可以通过ignoreParseErrors来跳过这行

Best,
Jingsong

On Mon, Jun 21, 2021 at 11:46 AM zhuxiaoshang 
wrote:

> Json反序列化失败了,像是数据问题
> {\n  \"app_time\": \"2021-06-14 10:00:00\",\n 
> \"category_id\": 1,\n  \"item_id\": 1,\n 
> \"user_id\": 1,\n  \"behavior\": \"pv\"\n}
>
> > 2021年6月20日 下午9:08,kcz <573693...@qq.com.INVALID> 写道:
> >
> > 大佬们 帮看下这个是为什么提示那个错误
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 
Best, Jingsong Lee


Re: 退订

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:16 PM 金晓龙  wrote:

> 退订



-- 
Best, Jingsong Lee


Re: 退訂

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau 
wrote:

> 郵箱更換,退訂
>
> Regards,
> Hau ChongAih
>


-- 
Best, Jingsong Lee


Re: 邮件退订

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:29 AM wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>

-- 
Best, Jingsong Lee


Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 Thread Jingsong Li
不能,除非你自己创建一个新的kafka connector。

不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。

你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:35 PM Michael Ran  wrote:

> dear all :
> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
> 但是根据 “implements DeserializationFormatFactory,
> SerializationFormatFactory”
> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>   有方法
> deserialize(ConsumerRecord record,
> Collector collector) 。
> 包装了offset 的对象:ConsumerRecord
>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>



-- 
Best, Jingsong Lee


Re: 退订

2021-06-15 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 5:05 PM 1049961436 <1049961...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: hbase async lookup能否保证输出结果有序?

2021-06-15 Thread Jingsong Li
是有序的。

无序的mode目前并没有支持, 目前可能会影响流计算的正确性

Best,
Jingsong

On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:

> hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
>


-- 
Best, Jingsong Lee


Re: 退订

2021-06-14 Thread Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 12:28 AM 张保淇  wrote:

> 退订



-- 
Best, Jingsong Lee


Re: (无主题)

2021-06-14 Thread Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 12:32 PM 1049961436 <1049961...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: 退订

2021-06-14 Thread Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Mon, Jun 14, 2021 at 7:51 PM 周超 <769699...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Jingsong Li
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao  wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>current hash-based blocking shuffle implementation writes too many files
>>concurrently which gives high pressure to the file system, for example,
>>maintenance of too many file metas, exhaustion of inodes or file
>>descriptors. All of these can be potential stability issues. Sort-Merge
>>based blocking shuffle don’t have the problem because for one result
>>partition, only one file is written at the same time.
>>2. *Performance*: Large amounts of small shuffle files and random IO
>>can influence shuffle performance a lot especially for hdd (for ssd,
>>sequential read is also important because of read ahead and cache). For
>>batch jobs processing massive data, small amount of data per subpartition
>>is common because of high parallelism. Besides, data skew is another cause
>>of small subpartition files. By merging data of all subpartitions together
>>in one file, more sequential read can be achieved.
>>3. *Resource*: For current hash-based implementation, each
>>subpartition needs at least one buffer. For large scale batch shuffles, 
>> the
>>memory consumption can be huge. For example, we need at least 320M network
>>memory per result partition if parallelism is set to 1 and because of
>>the huge network consumption, it is hard to config the network memory for
>>large scale batch job and  sometimes parallelism can not be increased just
>>because of insufficient network memory  which leads to bad user 
>> experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee


Re: flink sql支持Common Table Expression (CTE)吗?

2021-05-23 Thread Jingsong Li
支持。

如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。

在某些优化路径上不同,一般没有实质影响。

Best,
Jingsong

On Fri, May 21, 2021 at 11:32 PM casel.chen  wrote:

> flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view
> xxx 来实现?CTE和temporary view的区别是什么?
> 例如
>
>
> with toronto_ppl as (
> SELECT DISTINCT name
> FROM population
> WHERE country = "Canada"
>   AND city = "Toronto"
> )
>, avg_female_salary as (
> SELECT AVG(salary) as avgSalary
> FROM salaries
> WHERE gender = "Female"
> )
> SELECT name
>  , salary
> FROM People
> WHERE name in (SELECT DISTINCT FROM toronto_ppl)
>   AND salary >= (SELECT avgSalary FROM avg_female_salary)



-- 
Best, Jingsong Lee


Re: java.io.StreamCorruptedException: unexpected block data

2021-04-21 Thread Jingsong Li
Hi Alokh,

Maybe this is related to https://issues.apache.org/jira/browse/FLINK-20241

We can improve `SerializableConfiguration` to throw better exceptions.

So the true reason may be "ClassNotFoundException"

Can you check your dependencies? Like Hadoop related dependencies?

Best,
Jingsong

On Fri, Apr 16, 2021 at 4:31 PM Alokh P  wrote:

> The flink version is 1.12.1
>
> On Fri, Apr 16, 2021 at 1:59 PM Alokh P  wrote:
>
>> Hi Community,
>> Facing this error when trying to query Parquet data using flink SQL
>> Client
>>
>> Create Table command
>>
>> CREATE TABLE test(
>>   `username` STRING,
>>   `userid` INT) WITH ('connector' = 'filesystem',  'path' =
>> '/home/centos/test/0016_part_00.parquet',  'format' = 'parquet' );
>>
>> Select command :
>>
>> select * from test limit 10;
>>
>> Getting the following exception
>>
>> [ERROR] Could not execute SQL statement. 
>> Reason:org.apache.flink.runtime.client.JobInitializationException:
>> Could not instantiate JobManager.
>>
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the 
>> coordinator for operator Source: TableSourceScan(table=[[default_catalog, 
>> default_database, test]], fields=[username, userid]) -> 
>> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:231)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:318)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:272)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:245)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>>  at 
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>>  ... 4 more
>> Caused by: java.io.StreamCorruptedException: unexpected block data
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>>  at 

Re: Upgrade calcite version

2021-03-11 Thread Jingsong Li
Hi,

Yes, as Danny said, it is very hard work...

A suggestion is that you can cherry-pick some bugfixs from the new Calcite
version to your own internal Calcite branch, if you just want to fix some
bugs.

Best,
Jingsong

On Thu, Mar 11, 2021 at 2:28 PM Danny Chan  wrote:

> Hi Sheng ~
>
> It is a hard work to upgrade Calcite version because that means you need
> to:
>
> - Fix all the bug introduced by the new planner
> - Fix all the logical plan to have correct semantics
> - Replace the deprecate APIs to new
>
> In order to achieve this, you need to have good knowledge of Calcite basic
> and SQL planning (the relational algebra) that really few people have,
> so i would not suggest to do that at all for yourself.
>
> What is the purpose to must upgrade Calcite, can you share something ~
>
> Best,
> Danny Chan
>
>
> 盛森林  于2021年2月4日周四 下午10:47写道:
>
>> Hi,
>> I want to upgrade calcite to 1.22 in the flink branch that fork from
>> apache release 1.9.
>> Can community give me some suggestion.
>
>

-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-12 Thread Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值)

On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:

> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> > 2020年11月12日 下午3:22,Jingsong Li  写道:
> >
> > Hi admin,
> >
> > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> >
> > On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> >
> >> 补充一下不用partition time trigger的原因,partition
> >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
> >>
> >>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >>>
> >>> Hi ,kandy
> >>> 我没有基于partition time 提交分区,我是基于默认的process
> >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >>>
> >>>> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>>>
> >>>> hi:
> >>>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>>>
> >>>>
> >>>> https://cloud.tencent.com/developer/article/1707182
> >>>>
> >>>> 这个连接可以看一下
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>>>> Hi,all
> >>>>> Flink 1.11的filesystem connector,partition
> trigger[1]都是使用的默认值,所以分区可以多次提交
> >>>>> 现在有这样的场景:
> >>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>>>> 有大佬知道吗,有实际验证过吗
> >>>>> 感谢
> >>>>>
> >>>>> 附上简单sql:
> >>>>> CREATE TABLE kafka (
> >>>>> a STRING,
> >>>>> b STRING,
> >>>>> c BIGINT,
> >>>>> process_time BIGINT,
> >>>>> e STRING,
> >>>>> f STRING,
> >>>>> g STRING,
> >>>>> h INT,
> >>>>> i STRING
> >>>>> ) WITH (
> >>>>> 'connector' = 'kafka',
> >>>>> 'topic' = 'topic',
> >>>>> 'properties.bootstrap.servers' = 'x',
> >>>>> 'properties.group.id' = 'test-1',
> >>>>> 'scan.startup.mode' = 'latest-offset',
> >>>>> 'format' = 'json',
> >>>>> 'properties.flink.partition-discovery.interval-millis' = '30'
> >>>>> );
> >>>>>
> >>>>> CREATE TABLE filesystem (
> >>>>> `day` STRING,
> >>>>> `hour` STRING,
> >>>>> a STRING,
> >>>>> b STRING,
> >>>>> c BIGINT,
> >>>>> d BIGINT,
> >>>>> e STRING,
> >>>>> f STRING,
> >>>>> g STRING,
> >>>>> h INT,
> >>>>> i STRING
> >>>>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>>> 'connector' = 'filesystem',
> >>>>> 'format' = 'parquet',
> >>>>> 'path' = 'hdfs://xx',
> >>>>> 'parquet.compression'='SNAPPY',
> >>>>> 'sink.partition-commit.policy.kind' = 'success-file'
> >>>>> );
> >>>>>
> >>>>> insert into filesystem
> >>>>> select
> >>>>> from_unixtime(process_time,'-MM-dd') as `day`,
> >>>>> from_unixtime(process_time,'HH') as `hour`,
> >>>>> a,
> >>>>> b,
> >>>>> c,
> >>>>> d,
> >>>>> e,
> >>>>> f,
> >>>>> g,
> >>>>> h,
> >>>>> i
> >>>>> from kafka;
> >>>>>
> >>>>>
> >>>>>
> >>>>> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >>>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-11 Thread Jingsong Li
Hi admin,

不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)

On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:

> 补充一下不用partition time trigger的原因,partition
> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>
> > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >
> > Hi ,kandy
> > 我没有基于partition time 提交分区,我是基于默认的process
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >
> >> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>
> >> hi:
> >> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>
> >>
> >> https://cloud.tencent.com/developer/article/1707182
> >>
> >> 这个连接可以看一下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>> Hi,all
> >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> >>> 现在有这样的场景:
> >>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>> 有大佬知道吗,有实际验证过吗
> >>> 感谢
> >>>
> >>> 附上简单sql:
> >>> CREATE TABLE kafka (
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  process_time BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) WITH (
> >>>  'connector' = 'kafka',
> >>>  'topic' = 'topic',
> >>>  'properties.bootstrap.servers' = 'x',
> >>>  'properties.group.id' = 'test-1',
> >>>  'scan.startup.mode' = 'latest-offset',
> >>>  'format' = 'json',
> >>>  'properties.flink.partition-discovery.interval-millis' = '30'
> >>> );
> >>>
> >>> CREATE TABLE filesystem (
> >>>  `day` STRING,
> >>>  `hour` STRING,
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  d BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>  'connector' = 'filesystem',
> >>>  'format' = 'parquet',
> >>>  'path' = 'hdfs://xx',
> >>>  'parquet.compression'='SNAPPY',
> >>>  'sink.partition-commit.policy.kind' = 'success-file'
> >>> );
> >>>
> >>> insert into filesystem
> >>> select
> >>>  from_unixtime(process_time,'-MM-dd') as `day`,
> >>>  from_unixtime(process_time,'HH') as `hour`,
> >>>  a,
> >>>  b,
> >>>  c,
> >>>  d,
> >>>  e,
> >>>  f,
> >>>  g,
> >>>  h,
> >>>  i
> >>> from kafka;
> >>>
> >>>
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >
>
>

-- 
Best, Jingsong Lee


Re: Question about processing a 3-level List data type in parquet

2020-11-03 Thread Jingsong Li
Hi Naehee, sorry for the late reply.

I think you are right, there are bugs here. We didn't think about nested
structures very well before.

Now we mainly focus on the new BulkFormat implementation, which we need to
consider when implementing the new ParquetBulkFormat.

Best,
Jingsong

On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim  wrote:

> Hi Jingsong,
>
> I am forwarding the email below to you, thinking you will have a good idea
> about my questions below. I'd appreciate it if you give your thoughts.
>
> Thanks,
> Naehee
>
>
> -- Forwarded message -
> From: Naehee Kim 
> Date: Thu, Oct 29, 2020 at 4:38 PM
> Subject: Question about processing a 3-level List data type in parquet
> To: 
>
>
> Hi Flink Dev Community,
>
> I've found RowConverter.java in flink-parquet module doesn't support
> reading a 3-level list type in parquet though it is able to process a
> 2-level list type.
>
> 3-level
>
> optional group my_list (LIST) {
>   repeated group element {
> required binary str (UTF8);
>   };
> }
>
>
>   2-level
>
> optional group my_list (LIST) {
>   repeated int32 element;
> }
>
> Reference:
> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>
> The parquet file I am testing with was written by Spark job and it has a
> 3-level list type. When I try to process the parquet file, it runs into
> 'java.lang.ClassCastException: Expected instance of group converter but got
> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
> error.
>
> I've tested with Flink 1.9 and checked RowConverter.java still remains the
> same in v1.11. To process a 3-level list, I think RowConverter.java should
> be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level
> list is able to be processed with BasicArrayTypeInfo.). I wonder if my
> understanding is correct and if you have any plan to support a 3-level List
> datatype in parquet.
>
> For your reference, here are code snippet along with stack trace.
>
> MessageType readSchema = (new 
> AvroSchemaConverter()).convert(REPORTING_SCHEMA);
> RowTypeInfo rowTypeInfo = (RowTypeInfo) 
> ParquetSchemaConverter.fromParquetType(readSchema);
> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new 
> Path("file:///test-file.snappy.parquet"), readSchema);
> DataStreamSource dataSource = env.createInput(parquetInputFormat, 
> rowTypeInfo);
>
> -- stack trace
>
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException:
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
>   at 
> com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at 

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread Jingsong Li
- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的

On Mon, Nov 2, 2020 at 10:38 AM Rui Li  wrote:

> Hi,
>
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
>
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
>
> LOG.info("Committed partition {} to metastore", partitionSpec);
>
> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
>
>
> On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
>
> > 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> > streaming不能自动注册hive分区吗?还是我使用的姿势不对?
> >
> > 陈帅  于2020年11月1日周日 下午5:24写道:
> >
> > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > > ") STORED AS TEXTFILE TBLPROPERTIES ("
> > >
> > > 这是生成的hive表建表语句
> > >
> > > hive> show create table team;
> > > OK
> > > CREATE TABLE `team`(
> > >   `team_id` int,
> > >   `team_name` string,
> > >   `create_time` string,
> > >   `update_time` string,
> > >   `op` string)
> > > PARTITIONED BY (
> > >   `dt` string,
> > >   `hr` string,
> > >   `mi` string)
> > > ROW FORMAT SERDE
> > >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > > STORED AS INPUTFORMAT
> > >   'org.apache.hadoop.mapred.TextInputFormat'
> > > OUTPUTFORMAT
> > >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > > LOCATION
> > >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > > TBLPROPERTIES (
> > >   'is_generic'='false',
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> > >   'sink.partition-commit.delay'='1 min',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'transient_lastDdlTime'='160466')
> > > Time taken: 0.252 seconds, Fetched: 25 row(s)
> > >
> > > 另外,下载了hive文件内容如下
> > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> > 11:25:38<0x01>INSERT
> > >
> > > 还是查询不到结果
> > > hive> select * from team;
> > > OK
> > > Time taken: 0.326 seconds
> > >
> > > 陈帅  于2020年11月1日周日 下午5:10写道:
> > >
> > >>
> > >>
> >
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> > >> 生成的hive分区文件路径类似于
> > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> > >>
> > >> 陈帅  于2020年11月1日周日 下午4:43写道:
> > >>
> > >>>
> >
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> > >>> shell查不到数据。
> > >>>
> > >>> import com.alibaba.fastjson.JSON;
> > >>> import com.alibaba.fastjson.JSONObject;
> > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>> import org.apache.flink.api.common.typeinfo.Types;
> > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >>> import org.apache.flink.streaming.api.CheckpointingMode;
> > >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >>> import org.apache.flink.streaming.api.datastream.DataStream;
> > >>> import
> > >>>
> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> > >>> import
> > >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >>> import
> > >>>
> >
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> > >>> import org.apache.flink.streaming.api.windowing.time.Time;
> > >>> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> > >>> import
> > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> > >>> import org.apache.flink.table.api.EnvironmentSettings;
> > >>> import org.apache.flink.table.api.SqlDialect;
> > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> > >>> import org.apache.flink.types.Row;
> > >>> import org.apache.flink.types.RowKind;
> > >>>
> > >>> import java.time.Duration;
> > >>> import java.time.Instant;
> > >>> import java.time.LocalDateTime;
> > >>> import java.time.ZoneId;
> > >>> import java.time.format.DateTimeFormatter;
> > >>> import java.util.Properties;
> > >>>
> > >>> public class MysqlCDC2Hive {
> > >>>
> > >>> private static final DateTimeFormatter dtf =
> > >>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
> > >>>
> > >>> public static void main(String[] args) throws Exception {
> > >>> StreamExecutionEnvironment streamEnv =
> > >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>>
> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >>> streamEnv.setParallelism(3);
> > >>> streamEnv.enableCheckpointing(6);
> > >>>
> > >>> EnvironmentSettings tableEnvSettings =
> > >>> EnvironmentSettings.newInstance()
> > >>> .useBlinkPlanner()
> > >>> 

Re: Connecting the new DataSource API (FLIP-27) with DynamicTableSource

2020-11-01 Thread Jingsong Li
Hi Yuval,

Yes, The new table source does not support the new Source API in Flink
1.11. The integration is introduced in Flink master (1.12):

https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces

Best,
Jingsong

On Sun, Nov 1, 2020 at 10:54 PM Yuval Itzchakov  wrote:

> Hi,
>
> I've implemented a new custom source using the new DataSource API (
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html)
> and I want to connect it to the new DynamicTableSource API.
>
> However, in ScanTableSource, the getScanRuntimeProvider method returns a
> ScanRuntimeProvider interface, which only has an implementation for the old
> SourceFunction[T], and not for the new Source[T] class:
>
> [image: image.png]
>
> Does the Table API not yet support the new Source API? or am I missing
> something?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best, Jingsong Lee


Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 Thread Jingsong Li
注意时区哦,SQL层默认使用UTC的long值

On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
>   .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
>   }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> >  object StreamMain {
> >   def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inStreamingMode()
> >   .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> >   catalogName,  // catalog name
> >   "yutest",// default database
> >
> >   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive
> config (hive-site.xml) directory
> >   "1.1.0"   // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> > //  如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> >   )
> >   partitioned by(
> >   dt string,
> >   h string,
> >   m string) stored as parquet
> >   TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> >   )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into  fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> >   }
> > }
> > public class MySource implements SourceFunction {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Jingsong Li
+1 to remove the Bucketing Sink.

Thanks for the effort on ORC and `HadoopPathBasedBulkFormatBuilder`, I
think it's safe to get rid of the old Bucketing API with them.

Best,
Jingsong

On Thu, Oct 29, 2020 at 3:06 AM Kostas Kloudas  wrote:

> Thanks for the discussion!
>
> From this thread I do not see any objection with moving forward with
> removing the sink.
> Given this I will open a voting thread tomorrow.
>
> Cheers,
> Kostas
>
> On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
> >
> > +1 to remove the Bucketing Sink.
> >
> > It has been very common in the past to remove code that was deprecated
> for multiple releases in favor of reducing baggage.
> > Also in cases that had no perfect drop-in replacement, but needed users
> to forward fit the code.
> > I am not sure I understand why this case is so different.
> >
> > Why the Bucketing Sink should be thrown out, in my opinion:
> >
> > The Bucketing sink makes it easier for users to add general Hadoop
> writes.
> > But the price is that it easily leads to dataloss, because it assumes
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS
> works somewhat, S3 works not at all).
> > I think the Bucketing sink is a trap for users, that's why it was
> deprecated long ago.
> >
> > The StreamingFileSink covers the majority of cases from the Bucketing
> Sink.
> > It does have some friction when adding/wrapping some general Hadoop
> writers. Parts will be solved with the transactional sink work.
> > If something is missing and blocking users, we can prioritize adding it
> to the Streaming File Sink. Also that is something we did before and it
> helped being pragmatic with moving forward, rather than being held back by
> "maybe there is something we don't know".
> >
> >
> >
> >
> > On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler 
> wrote:
> >>
> >> Then we can't remove it, because there is no way for us to ascertain
> >> whether anyone is still using it.
> >>
> >> Sure, the user ML is the best we got, but you can't argue that we don't
> >> want any users to be affected and then use an imperfect mean to find
> users.
> >> If you are fine with relying on the user ML, then you _are_ fine with
> >> removing it at the cost of friction for some users.
> >>
> >> To be clear, I, personally, don't have a problem with removing it (we
> >> have removed other connectors in the past that did not have a migration
> >> plan), I just reject he argumentation.
> >>
> >> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
> >> > No, I do not think that "we are fine with removing it at the cost of
> >> > friction for some users".
> >> >
> >> > I believe that this can be another discussion that we should have as
> >> > soon as we establish that someone is actually using it. The point I am
> >> > trying to make is that if no user is using it, we should remove it and
> >> > not leave unmaintained code around.
> >> >
> >> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler 
> wrote:
> >> >> The alternative could also be to use a different argument than "no
> one
> >> >> uses it", e.g., we are fine with removing it at the cost of friction
> for
> >> >> some users because there are better alternatives.
> >> >>
> >> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> >> >>> I think that the mailing lists is the best we can do and I would say
> >> >>> that they seem to be working pretty well (e.g. the recent Mesos
> >> >>> discussion).
> >> >>> Of course they are not perfect but the alternative would be to never
> >> >>> remove anything user facing until the next major release, which I
> find
> >> >>> pretty strict.
> >> >>>
> >> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler <
> ches...@apache.org> wrote:
> >>  If the conclusion is that we shouldn't remove it if _anyone_ is
> using
> >>  it, then we cannot remove it because the user ML obviously does not
> >>  reach all users.
> >> 
> >>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> >> > Hi all,
> >> >
> >> > I am bringing the up again to see if there are any users actively
> >> > using the BucketingSink.
> >> > So far, if I am not mistaken (and really sorry if I forgot
> anything),
> >> > it is only a discussion between devs about the potential problems
> of
> >> > removing it. I totally understand Chesnay's concern about not
> >> > providing compatibility with the StreamingFileSink (SFS) and if
> there
> >> > are any users, then we should not remove it without trying to
> find a
> >> > solution for them.
> >> >
> >> > But if there are no users then I would still propose to remove the
> >> > module, given that I am not aware of any efforts to provide
> >> > compatibility with the SFS any time soon.
> >> > The reasons for removing it also include the facts that we do not
> >> > actively maintain it and we do not add new features. As for
> potential
> >> > missing features in the SFS compared to the BucketingSink that was

Re: flink hive Streaming查询不到数据的问题

2020-10-28 Thread Jingsong Li
Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
>  >
>  > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> --
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee


Re: flink sql 写入hive问题

2020-10-22 Thread Jingsong Li
writer的并行度是根据上游并行度来的

committer的并行度才是1

On Thu, Oct 22, 2020 at 5:22 PM 酷酷的浑蛋  wrote:

> 我用flink sql实时写入hive表时发现sink的并行度为1?
> 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  并行度1的写入速度很慢
>
>
>
>

-- 
Best, Jingsong Lee


Re: Streaming File Sink cannot generate _SUCCESS tag files

2020-10-18 Thread Jingsong Li
Hi, Yang,

"SUCCESSFUL_JOB_OUTPUT_DIR_MARKER" does not work in StreamingFileSink.

You can take a look to partition commit feature [1],

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit

Best,
Jingsong Lee

On Thu, Oct 15, 2020 at 3:11 PM highfei2011  wrote:

> Hi, everyone!
>   Currently experiencing a problem with the bucketing policy sink to
> hdfs using BucketAssigner of Streaming File Sink after consuming Kafka data
> with FLink -1.11.2, the _SUCCESS tag file is not generated by default.
>   I have added the following to the configuration
>
> val hadoopConf = new Configuration()
> hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
> "true")
>
> But there is still no _SUCCESS file in the output directory, so why not
> support generating _SUCCESS files?
>
> Thank you.
>
>
> Best,
> Yang
>


-- 
Best, Jingsong Lee


Re: flink1.11流式写入hive速度慢的问题

2020-10-09 Thread Jingsong Li
Hi,
是Hive表吧?
https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的

可以下载最新的1.11分支的Hive依赖来试下:
https://repository.apache.org/snapshots/org/apache/flink/
(比如你用hive-1.2.2依赖,你可以下载
https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
)

Best,
Jingsong

On Fri, Oct 9, 2020 at 3:50 PM me  wrote:

> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年10月9日(周五) 15:34
> 主题: flink1.11流式写入hive速度慢的问题
>
>
> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
> chaitin_test.printSchema()
> tableEnv.executeSql("insert into chaitin_test select test from " +
> chaitin_test)



-- 
Best, Jingsong Lee


[DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jingsong Li
Hi devs and users:

After the 1.11 release, I heard some voices recently: How can't Hive's
documents be found in the "Table & SQL Connectors".

Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
SQL Connectors" document was extracted separately, Hive is a little out of
place.
And Hive's code is also in "flink-connector-hive", which should be a
connector.
Hive also includes the concept of HiveCatalog. Is catalog a part of the
connector? I think so.

What do you think? If you don't object, I think we can move it.

Best,
Jingsong Lee


[DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jingsong Li
Hi devs and users:

After the 1.11 release, I heard some voices recently: How can't Hive's
documents be found in the "Table & SQL Connectors".

Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
SQL Connectors" document was extracted separately, Hive is a little out of
place.
And Hive's code is also in "flink-connector-hive", which should be a
connector.
Hive also includes the concept of HiveCatalog. Is catalog a part of the
connector? I think so.

What do you think? If you don't object, I think we can move it.

Best,
Jingsong Lee


Re: Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 Thread Jingsong Li
Hi,

不好意思,麻烦试下
试下最新的release-1.11分支编译出来的Hive依赖 (flink-connector-hive的 改动)

> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?

这是1.12的目标,这两天会出来JIRA和设计方案,类似会加上"auto-compaction"的配置,sink中自动合并

Best,
Jingsong


On Fri, Sep 18, 2020 at 10:18 AM kandy.wang  wrote:

>
>
>
>
>
>
> @Jingsong Li
>   测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
> 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter
> 修改应该都提交到flink 1.11分支了吧。
> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?
>
>
> 在 2020-09-17 14:19:42,"Jingsong Li"  写道:
> >是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> >
> >> 为何要强制滚动文件
> >
> >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
> >
> >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >> ok. 就是用hadoop mr writer vs  flink 自实现的native
> >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> >> 改成false是可以满足我们的写hive需求了
> >> 还有一个问题,之前问过你,你还没回复:
> >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true
> 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> >>
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> >> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >> >可以再尝试下最新的1.11.2吗?
> >> >
> >> >https://flink.apache.org/downloads.html
> >> >
> >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >> >
> >> >> 是master分支代码
> >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> >> if (userMrWriter) {
> >> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner,
> >> >> rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> >> } else {
> >> >>Optional> bulkFactory =
> >> >> createBulkWriterFactory(partitionColumns, sd);
> >> >>if (bulkFactory.isPresent()) {
> >> >>   builder = StreamingFileSink.forBulkFormat(
> >> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> >> new
> >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> >> partComputer))
> >> >> .withBucketAssigner(assigner)
> >> >> .withRollingPolicy(rollingPolicy)
> >> >> .withOutputFileConfig(outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> >> } else {
> >> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> >> assigner, rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer
> because
> >> >> BulkWriter Factory not available.");
> >> >> }
> >> >> }
> >> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >> >是最新的代码吗?
> >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >> >
> >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >> @Jingsong Li
> >> >> >>
> >> >> >> public TableSink createTableSink(TableSinkFactory.Context
> context) {
> >> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >> >>
> >> >> >>boolean isGeneric =
> >> >> >>
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >> >>
> >> >> >>if (!isGeneric) {
> >> >> >> return new HiveTableSink(
> >> >> >> context.getConfiguration().get(
> >> >> >>
> >>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> >> context.isBounded(),
> >> >> >> new JobConf(hiveConf),
> >> >> >> context.getObjectIdentifier(),
> >> >> >> table);
> >> >> >> } else {
> >> >> >> return TableFactoryUtil.findAndC

Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 Thread Jingsong Li
你仔细看看这两个数据源是不是有什么不同
只要有一点不同,Blink 就 reuse 不了

On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:

> 场景描述:
> 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> 问题描述:
> Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
>
> 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> - table.optimizer.reuse-source-enabled
> - table.optimizer.reuse-sub-plan-enabled
>
> 请问下,有人碰到类似问题么?
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best, Jingsong Lee


Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 Thread Jingsong Li
是的,可以测一下,理论上 mr writer不应该有较大性能差距。

> 为何要强制滚动文件

因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。

On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:

>
>
>
> ok. 就是用hadoop mr writer vs  flink 自实现的native
> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> 改成false是可以满足我们的写hive需求了
> 还有一个问题,之前问过你,你还没回复:
> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >可以再尝试下最新的1.11.2吗?
> >
> >https://flink.apache.org/downloads.html
> >
> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >
> >> 是master分支代码
> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> if (userMrWriter) {
> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner,
> >> rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> } else {
> >>Optional> bulkFactory =
> >> createBulkWriterFactory(partitionColumns, sd);
> >>if (bulkFactory.isPresent()) {
> >>   builder = StreamingFileSink.forBulkFormat(
> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> new
> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> partComputer))
> >> .withBucketAssigner(assigner)
> >> .withRollingPolicy(rollingPolicy)
> >> .withOutputFileConfig(outputFileConfig);
> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> } else {
> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner, rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> >> BulkWriter Factory not available.");
> >> }
> >> }
> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >是最新的代码吗?
> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >
> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >> >
> >> >> @Jingsong Li
> >> >>
> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >>
> >> >>boolean isGeneric =
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >>
> >> >>if (!isGeneric) {
> >> >> return new HiveTableSink(
> >> >> context.getConfiguration().get(
> >> >>
>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> context.isBounded(),
> >> >> new JobConf(hiveConf),
> >> >> context.getObjectIdentifier(),
> >> >> table);
> >> >> } else {
> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> }
> >> >> }
> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >>
> >> >> If it is false, using flink native writer to write parquet and orc
> >> files;
> >> >>
> >> >> If it is true, using hadoop mapred record writer to write parquet and
> >> orc
> >> >> files
> >> >>
> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >>
> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> >> 一些相关的参数 ?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >> >Sink并行度
> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >
> >> >> >HDFS性能
> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >
> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
> wrote:
>

Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 Thread Jingsong Li
可以再尝试下最新的1.11.2吗?

https://flink.apache.org/downloads.html

On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:

> 是master分支代码
> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> if (userMrWriter) {
>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
> rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> } else {
>Optional> bulkFactory =
> createBulkWriterFactory(partitionColumns, sd);
>if (bulkFactory.isPresent()) {
>   builder = StreamingFileSink.forBulkFormat(
> new org.apache.flink.core.fs.Path(sd.getLocation()),
> new
> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
> .withBucketAssigner(assigner)
> .withRollingPolicy(rollingPolicy)
> .withOutputFileConfig(outputFileConfig);
> LOG.info("Hive streaming sink: Use native parquet writer.");
> } else {
>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner, rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> BulkWriter Factory not available.");
> }
> }
> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >是最新的代码吗?
> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >它是影响性能的,1.11.2已经投票通过,即将发布
> >
> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >
> >> @Jingsong Li
> >>
> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >>CatalogTable table = checkNotNull(context.getTable());
> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >>
> >>boolean isGeneric =
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >>
> >>if (!isGeneric) {
> >> return new HiveTableSink(
> >> context.getConfiguration().get(
> >>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> context.isBounded(),
> >> new JobConf(hiveConf),
> >> context.getObjectIdentifier(),
> >> table);
> >> } else {
> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> }
> >> }
> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >>
> >> If it is false, using flink native writer to write parquet and orc
> files;
> >>
> >> If it is true, using hadoop mapred record writer to write parquet and
> orc
> >> files
> >>
> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >>
> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> 一些相关的参数 ?
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >Sink并行度
> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >
> >> >HDFS性能
> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >
> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >> >
> >> >> 场景很简单,就是kafka2hive
> >> >> --5min入仓Hive
> >> >>
> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >>
> >> >> SELECT
> >> >>
> >> >>  arg_service,
> >> >>
> >> >> time_local
> >> >>
> >> >> .
> >> >>
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >>
> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> properties.group.id
> >> '='kafka_hive_test',
> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >>
> >> >>
> >> >>
> >> >> --kafka source表定义
> >> >>
> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >>
> >> >> arg_service string COMMENT 'arg_service',
> >> >>
> >> >> 
> >> >>
> >> >> )WITH (
> >> >>
> >> >>   'connector' = 'kafka',
> >> >>
> >> >>   'topic' = '...',
> >> >>
> >> >>   'properties.bootstrap.servers' = '...',
> >> >>

Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 Thread Jingsong Li
Thanks ZhuZhu for driving the release.

Best,
Jingsong

On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 Thread Jingsong Li
Thanks ZhuZhu for driving the release.

Best,
Jingsong

On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


-- 
Best, Jingsong Lee


Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 Thread Jingsong Li
是最新的代码吗?
1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
它是影响性能的,1.11.2已经投票通过,即将发布

On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:

> @Jingsong Li
>
> public TableSink createTableSink(TableSinkFactory.Context context) {
>CatalogTable table = checkNotNull(context.getTable());
> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>
>boolean isGeneric =
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>
>if (!isGeneric) {
> return new HiveTableSink(
> context.getConfiguration().get(
>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> context.isBounded(),
> new JobConf(hiveConf),
> context.getObjectIdentifier(),
> table);
> } else {
> return TableFactoryUtil.findAndCreateTableSink(context);
> }
> }
>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>
> If it is false, using flink native writer to write parquet and orc files;
>
> If it is true, using hadoop mapred record writer to write parquet and orc
> files
>
> 将此参数调整成false后,同样的资源配置下,tps达到30W
>
> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> 一些相关的参数 ?
>
>
>
>
>
> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >Sink并行度
> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >
> >HDFS性能
> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >
> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >
> >> 场景很简单,就是kafka2hive
> >> --5min入仓Hive
> >>
> >> INSERT INTO  hive.temp_.hive_5min
> >>
> >> SELECT
> >>
> >>  arg_service,
> >>
> >> time_local
> >>
> >> .
> >>
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >>
> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
> '='kafka_hive_test',
> >> 'scan.startup.mode'='earliest-offset') */;
> >>
> >>
> >>
> >> --kafka source表定义
> >>
> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >>
> >> arg_service string COMMENT 'arg_service',
> >>
> >> 
> >>
> >> )WITH (
> >>
> >>   'connector' = 'kafka',
> >>
> >>   'topic' = '...',
> >>
> >>   'properties.bootstrap.servers' = '...',
> >>
> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >>
> >>   'scan.startup.mode' = 'group-offsets',
> >>
> >>   'format' = 'json',
> >>
> >>   'json.fail-on-missing-field' = 'false',
> >>
> >>   'json.ignore-parse-errors' = 'true'
> >>
> >> );
> >> --sink hive表定义
> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> 
> >> )
> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
> >>   'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval' ='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >> );
> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> 就是flink sql可以
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> 这块,有没有什么可以提升性能相关的优化参数?
> >>
> >>
> >>
> >>
> >> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >> >Hi,
> >> >
> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >
> >> >另外,压测时是否可以看下jstack?
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >> >
> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> ,source
> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Re: StreamingFileWriter 压测性能

2020-09-16 Thread Jingsong Li
Sink并行度
我理解是配置Sink并行度,这个一直在讨论,还没结论

HDFS性能
具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO

On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:

> 场景很简单,就是kafka2hive
> --5min入仓Hive
>
> INSERT INTO  hive.temp_.hive_5min
>
> SELECT
>
>  arg_service,
>
> time_local
>
> .
>
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>
> FROM hive.temp_.kafka_source_pageview/*+ 
> OPTIONS('properties.group.id'='kafka_hive_test',
> 'scan.startup.mode'='earliest-offset') */;
>
>
>
> --kafka source表定义
>
> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>
> arg_service string COMMENT 'arg_service',
>
> 
>
> )WITH (
>
>   'connector' = 'kafka',
>
>   'topic' = '...',
>
>   'properties.bootstrap.servers' = '...',
>
>   'properties.group.id' = 'flink_etl_kafka_hive',
>
>   'scan.startup.mode' = 'group-offsets',
>
>   'format' = 'json',
>
>   'json.fail-on-missing-field' = 'false',
>
>   'json.ignore-parse-errors' = 'true'
>
> );
> --sink hive表定义
> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> 
> )
> PARTITIONED BY (dt string , hm string) stored as orc location
> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>   'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval' ='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
> );
> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> 就是flink sql可以
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> 这块,有没有什么可以提升性能相关的优化参数?
>
>
>
>
> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >Hi,
> >
> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >
> >另外,压测时是否可以看下jstack?
> >
> >Best,
> >Jingsong
> >
> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >
> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> ,source
> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-16 Thread Jingsong Li
你指的可能是控制sink的并行度,这个一直在讨论中

On Wed, Sep 16, 2020 at 10:26 PM wangenbao <156827...@qq.com> wrote:

> 感谢回复
> 目前确实使用keyBy,能把并行度提高,分散数据到多个TaskManager中,但遇见个问题
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916221935.png>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916222005.png>
>
>
> 不知道能不能直接控制Insert语句的并行度
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: StreamingFileWriter 压测性能

2020-09-16 Thread Jingsong Li
Hi,

可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?

另外,压测时是否可以看下jstack?

Best,
Jingsong

On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:

> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少



-- 
Best, Jingsong Lee


Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-15 Thread Jingsong Li
可以考虑在写之前按照hashtid keyBy下吗?

Best,
Jingsong

On Wed, Sep 16, 2020 at 9:36 AM wangenbao <156827...@qq.com> wrote:

> 求教各位大佬:
> 有遇到如下问题的吗?
>
> 1、我首先通过TableAPI读取Kafka中PB格式数据,转换成POJO对象,然后注册成View;
> 2、然后Insert into到三分区(日,小时,hashtid)的Hive表(Parquet格式Snappy压缩)中;
> 3、数据的分区相对分散些就会出现OOM问题,具体表现为
> parquet.hadoop.MemoryManager: Total allocation exceeds 50.00%
> (2,102,394,880
> bytes) of heap memory
> Scaling row group sizes to 13.62% for 115 writers
> 随后就会出现java.lang.OutOfMemoryError: Java heap space
>
> 我认为是Parquet的Writer数比较多,不知道大佬遇见过类似问题吗,该如何解决啊
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best, Jingsong Lee


Re: Flink Table API and not recognizing s3 plugins

2020-09-14 Thread Jingsong Li
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making
S3Filesystem works in the client. But for a long term solution, we can fix
it.

I created https://issues.apache.org/jira/browse/FLINK-19228 for tracking
this.

Best,
Jingsong

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> As far as I checked in the code, the FileSystemSink will try to create
> staging directories from the client. I think it might be problematic, as
> your case shows. We might need to revisit that part. I am cc'ing Jingsong
> who worked on the FileSystemSink.
>
> As a workaround you might try putting the s3 plugin on the CLI classpath
> (not sure if plugins work for the CLI through the /plugins directory).
>
> Best,
>
> Dawid
> On 10/09/2020 22:13, Dan Hill wrote:
>
> This is running on my local minikube and is trying to hit minio.
>
> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill  wrote:
>
>> I'm using this Helm chart
>> .  I
>> start the job by building an image with the job jar and using kubectl apply
>> to do a flink run with the jar.
>>
>> The log4j.properties on jobmanager and taskmanager have debug level set
>> and are pretty embedded into the Helm chart.  My log4j-cli.properties is
>> hacked on the CLI side.
>>
>> I thought I just needed the s3 plugins in the jobmanager and
>> taskmanager.  Do I need to have a similar plugin structure from the image
>> where I run 'flink run'?
>>
>>
>> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill  wrote:
>>
>>> Copying more of the log
>>>
>>> 2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -
>>> 
>>>
>>> 2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  Starting Command Line Client (Version:
>>> 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
>>>
>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  OS current user: root
>>>
>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  Current Hadoop/Kerberos user: >> dependency found>
>>>
>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  JVM: OpenJDK 64-Bit Server VM - Oracle
>>> Corporation - 1.8/25.265-b01
>>>
>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  Maximum heap size: 2167 MiBytes
>>>
>>> tail: log/flink--client-flink-jobmanager-0.log: file truncated
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  JAVA_HOME: /usr/local/openjdk-8
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  No Hadoop Dependency available
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  JVM Options:
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -
>>> -Djava.security.properties=/opt/flink/conf/security.properties
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -
>>> -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log
>>>
>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - -Dlog4j.configuration=
>>> file:/opt/flink/conf/log4j-cli.properties
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - -Dlog4j.configurationFile=
>>> file:/opt/flink/conf/log4j-cli.properties
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - -Dlogback.configurationFile=
>>> file:/opt/flink/conf/logback.xml
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  Program Arguments:
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - list
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - --jobmanager
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] - localhost:8081
>>>
>>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>> [] -  Classpath:
>>> 

Re: Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-09-08 Thread Jingsong Li
Hi kandy~

有可能是https://issues.apache.org/jira/browse/FLINK-19166
这个问题导致的,即将发布的1.11.2会Fix它,希望你可以确认重试下~

Best,
Jingsong

On Fri, Aug 14, 2020 at 7:22 PM kandy.wang  wrote:

> @Jingsong  orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景
>
> 在 2020-08-12 16:04:13,"Jingsong Li"  写道:
> >另外问一下,是什么格式?csv还是parquet。
> >有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?
> >
> >On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
> >> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
> >> 就是感觉停止之前正在写的那个分区,没有触发commit
> >>
> >>
> >>
> >>
> >> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
> >> >那你之前的分区除了in-progress文件,有已完成的文件吗?
> >> >
> >> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >> >> source就是kafka
> >> >>
> >>
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >>
> >> >>
> >> >>
> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >>
> >> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >> >> >你的source是exactly-once的source吗?
> >> >> >
> >> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >> >
> >> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> >@ Jingsong
> >> >> >>
> >> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> >> >> 用presto查询查不了
> >> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >> >> >>  'sink.partition-commit.trigger'='process-time',
> >> >> >>   'sink.partition-commit.delay'='0 min',
> >> >> >>
> >>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >> >>   'sink.rolling-policy.check-interval'='30s',
> >> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive
> add
> >> >> >> partition,就导致有数据,但是确查不 了。
> >> >> >>
> >> >>
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> >> >> partition 也能查了。
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >> >> >>
> >> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu 
> wrote:
> >> >> >> >>
> >> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >> >> >>>
> >> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
> >> wrote:
> >> >> >> >>>
> >> >> >> >>> > 1.StreamingFileWriter
> >> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >> >> >>> >
> >> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>--
> >> >> >> >>Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-08 Thread Jingsong Li
非常感谢你的反馈,应该是真的有问题,我建个JIRA追踪下

https://issues.apache.org/jira/browse/FLINK-19166

会包含在即将发布的1.11.2中

Best,
Jingsong

On Wed, Sep 9, 2020 at 10:44 AM MuChen <9329...@qq.com> wrote:

> hi,Rui Li:
> 没有提交分区的目录是commited状态,手动add partition是可以正常查询的
>
> /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-19/hour=07/part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1031
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> lirui.fu...@gmail.com;
> 发送时间:2020年9月8日(星期二) 晚上9:43
> 收件人:"MuChen"<9329...@qq.com;
> 抄送:"user-zh" ;
> 主题:Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
>
>
>
> 另外也list一下没有提交的分区目录吧,看看里面的文件是什么状态
>
> On Tue, Sep 8, 2020 at 9:19 PM Rui Li 
>  作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交?
> 
>  On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com wrote:
> 
>  hi, Rui Li:
>  如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志:
>  2020-09-04 17:17:10,548 INFO
> org.apache.flink.streaming.api.operators.
>  AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of
> table
>  `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
>  2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem.
>  MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
>  to metastore
>  2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem.
>  SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
>  with success file
>  2020-09-04 17:17:19,652 INFO
> org.apache.flink.streaming.api.operators.
>  AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of
> table
>  `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
>  2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem.
>  MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
>  to metastore
>  2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem.
>  SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
>  with success file
> 
>  写hdfs的日志是都有的:
>  2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
>  ParquetRecordWriterWrapper [] - creating real writer to write at
> hdfs://
> 
> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
>  08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140
>  .inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de
>  2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
>  ParquetRecordWriterWrapper [] - creating real writer to write at
> hdfs://
> 
> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
>  08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142
>  .inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd
> 
>  -- 原始邮件 --
>  *发件人:* "Rui Li"   *发送时间:* 2020年9月8日(星期二) 中午12:09
>  *收件人:* "user-zh" jkill...@dingtalk.com;
>  *抄送:* "MuChen"<9329...@qq.com;
>  *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> 
>  streaming file committer在提交分区之前会打印这样的日志:
> 
>  LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
> 
>  partition commit policy会在成功提交分区以后打印这样的日志:
> 
>  LOG.info("Committed partition {} to metastore", partitionSpec);
> 
>  LOG.info("Committed partition {} with success file",
> context.partitionSpec());
> 
>  可以检查一下这样的日志,看是不是卡在什么地方了
> 
>  On Tue, Sep 8, 2020 at 11:02 AM 夏帅  wrote:
> 
>  就第二次提供的日志看,好像是你的namenode出现的问题
> 
> 
> 
> --
>  发件人:MuChen <9329...@qq.com
>  发送时间:2020年9月8日(星期二) 10:56
>  收件人:user-zh@flink.apache.org 夏帅  user-zh <
>  user-zh@flink.apache.org
>  主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> 
>  在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
>  2020-09-04 17:17:59,520 INFO
>  org.apache.hadoop.io.retry.RetryInvocationHandler [] -
> Exception while
>  invoking create of class ClientNamenodeProtocolTranslatorPB
> over
>  uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over
> attempts.
>  Trying to fail over immediately.
>  java.io.IOException: java.lang.InterruptedException
>  at
> org.apache.hadoop.ipc.Client.call(Client.java:1449)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> org.apache.hadoop.ipc.Client.call(Client.java:1401)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
>  at
> 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
>  ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
>  at
> sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>  ~[?:?]
>  at
> 
> 

Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-08 Thread Jingsong Li
插入Hive表的SQL也发下?

On Tue, Sep 8, 2020 at 9:44 PM Rui Li  wrote:

> 另外也list一下没有提交的分区目录吧,看看里面的文件是什么状态
>
> On Tue, Sep 8, 2020 at 9:19 PM Rui Li  wrote:
>
> > 作业有发生failover么?还是说作业能成功结束但是某些partition始终没提交?
> >
> > On Tue, Sep 8, 2020 at 5:20 PM MuChen <9329...@qq.com> wrote:
> >
> >> hi, Rui Li:
> >> 如你所说,的确有类似日志,但是只有成功增加的分区的日志,没有失败分区的日志:
> >> 2020-09-04 17:17:10,548 INFO org.apache.flink.streaming.api.operators.
> >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of table
> >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
> >> 2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem.
> >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18}
> >> to metastore
> >> 2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem.
> >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=18}
> >> with success file
> >> 2020-09-04 17:17:19,652 INFO org.apache.flink.streaming.api.operators.
> >> AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of table
> >> `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be
> committed
> >> 2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem.
> >> MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19}
> >> to metastore
> >> 2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem.
> >> SuccessFileCommitPolicy [] - Committed partition {dt=2020-08-22,
> hour=19}
> >> with success file
> >>
> >> 写hdfs的日志是都有的:
> >> 2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
> >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://
> >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
> >> 08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140
> >> .inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de
> >> 2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io
> .parquet.write.
> >> ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://
> >> Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-
> >> 08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142
> >> .inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd
> >>
> >> -- 原始邮件 --
> >> *发件人:* "Rui Li" ;
> >> *发送时间:* 2020年9月8日(星期二) 中午12:09
> >> *收件人:* "user-zh";"夏帅";
> >> *抄送:* "MuChen"<9329...@qq.com>;
> >> *主题:* Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> >>
> >> streaming file committer在提交分区之前会打印这样的日志:
> >>
> >> LOG.info("Partition {} of table {} is ready to be committed", partSpec,
> tableIdentifier);
> >>
> >> partition commit policy会在成功提交分区以后打印这样的日志:
> >>
> >> LOG.info("Committed partition {} to metastore", partitionSpec);
> >>
> >> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
> >>
> >> 可以检查一下这样的日志,看是不是卡在什么地方了
> >>
> >> On Tue, Sep 8, 2020 at 11:02 AM 夏帅 
> wrote:
> >>
> >>> 就第二次提供的日志看,好像是你的namenode出现的问题
> >>>
> >>>
> >>> --
> >>> 发件人:MuChen <9329...@qq.com>
> >>> 发送时间:2020年9月8日(星期二) 10:56
> >>> 收件人:user-zh@flink.apache.org 夏帅 ; user-zh <
> >>> user-zh@flink.apache.org>
> >>> 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
> >>>
> >>> 在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
> >>> 2020-09-04 17:17:59,520 INFO
> >>> org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while
> >>> invoking create of class ClientNamenodeProtocolTranslatorPB over
> >>> uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts.
> >>> Trying to fail over immediately.
> >>> java.io.IOException: java.lang.InterruptedException
> >>> at org.apache.hadoop.ipc.Client.call(Client.java:1449)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at
> >>>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
> >>> at
> >>>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> >>> ~[?:?]
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> ~[?:1.8.0_144]
> >>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
> >>> at
> >>>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> >>> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> >>> at
> >>>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >>> 

Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 Thread Jingsong Li
Hi,

flink-sql-orc_2.11-1.11.0.jar 和 flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar
目前是不能共存的,不然会冲突,你试试去掉flink-sql-orc看看?

On Tue, Sep 8, 2020 at 4:55 PM 大罗  wrote:

> Hi ,我例子中的hive orc表,不是事务表,如图:
>
> createtab_stmt
> CREATE TABLE `dest_orc`(
>   `i` int)
> PARTITIONED BY (
>   `ts` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
> LOCATION
>   'hdfs://nameservice1/opt/user/hive/warehouse/dw.db/dest_orc'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'orc.compress'='SNAPPY',
>   'transient_lastDdlTime'='1599555226')
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 Thread Jingsong Li
Hi,

flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。

你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing

Best,
Jingsong

On Mon, Sep 7, 2020 at 2:11 PM 大罗  wrote:

> Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
> sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:
>
> 我所使用的版本如下:
>
> Hadoop 3.0.0+cdh6.3.2
>
> HDFS 3.0.0+cdh6.3.2
>
> HBase 2.1.0+cdh6.3.2
>
> Hive 2.1.1+cdh6.3.2
>
> Flink 1.11.1
>
> 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:
>
> TestStreamFileSinkViaCustomVectorizer.java
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java>
>
>
>
> 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14
>
> 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:
>
> CREATE TABLE ods_person_test_os(
> name string, age int)
> partitioned by (dt_day string, dt_hour string)
> STORED AS ORC
> LOCATION 'hdfs://nameservice1/tmp/person_orc/'
> TBLPROPERTIES(
>  'orc.compress'='SNAPPY'
> );
>
> 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
> dt_hour='16')
> ",
> 运行查询语句 "select * from ods_person_test_os"后,报错,
> hive-error.txt
> 
>
> 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。
>
> 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。
>
>
> 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.ORC_517,也就是第7个。
>
> ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
> HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
> HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
> HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
> HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
> ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
> ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
> ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
> ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
> PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
> SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
> 6),
> FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);
>
>
> 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
> ORIGINAL(0),
> HIVE_8732(1),
> HIVE_4243(2),
> HIVE_12055(3),
> HIVE_13083(4),
> FUTURE(2147483647);
>
> 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!
>
> 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
>
> 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
> =
>
> OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
> 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
> 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';
>
> 那么,为了匹配不同版本hive使用的orc writer
> version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-06 Thread Jingsong Li
另外,可能和使用本地文件系统有关?换成HDFS试试?

On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li  wrote:

> Hi,
>
> 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
>
> On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:
>
>> Hi, all
>>
>> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
>> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
>>
>> 请问有什么好的解决方式没呢?
>>
>> Best Wishes.
>>
>> Peihui He  于2020年9月4日周五 下午6:25写道:
>>
>>> Hi, all
>>>
>>> 当指定partition的时候这个问题通过path 也没法解决了
>>>
>>> CREATE TABLE MyUserTable (
>>>   column_name1 INT,
>>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>   'format' = 'json', -- required: file system connector)
>>>
>>>
>>> select  * from  MyUserTable  limit 10;
>>>
>>> job 会一直卡在一个地方
>>> [image: image.png]
>>>
>>> 这种改怎么解决呢?
>>>
>>> Peihui He  于2020年9月4日周五 下午6:02写道:
>>>
>>>> hi, all
>>>> 我这边用flink sql client 创建表的时候
>>>>
>>>> CREATE TABLE MyUserTable (
>>>>   column_name1 INT,
>>>>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>>>>   'connector' = 'filesystem',   -- required: specify the connector
>>>>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>>>>   'format' = 'json', -- required: file system 
>>>> connector)
>>>>
>>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>>>> sql client 提交job会很慢,最后会报错
>>>>
>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>>> [Internal server error., >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>>> already been submitted. at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> End of exception on server side>] at
>>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>>>> at
>>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>>>
>>>>
>>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>>>>
>>>> 这种情况不知道有没有遇到过?
>>>>
>>>> Best Wishes.
>>>>
>>>>
>>>>
>>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


  1   2   3   4   >