Re: [DISCUSS] flink-connector-rabbitmq api changes

2020-04-29 Thread seneg...@gmail.com
Hello,

I am new to the mailing list and to contributing in Big opensource projects
in general and i don't know if i did something wrong or should be more
patient :)

I put a topic for discussion as per the contribution guide "
https://flink.apache.org/contributing/how-to-contribute.html; almost a week
ago and since what i propose is not backward compatible it needs to be
discussed here before opening a ticket and moving forward.

So my question is. Will someone pick the discussion up ? or at least
someone would say that this is not the way to go ? or should i assume from
the silence that it's not important / relevant to the project ? Should i
track the author of the connector and send him directly ?

Thank you for your time.

Regards,
Karim Mansour

On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com 
wrote:

> Dear All,
>
> I want to propose a change to the current RabbitMQ connector.
>
> Currently the RMQSource is extracting the body of the message which is a
> byte array and pass it to a an instance of a user implementation of the
> DeserializationSchema class to deserialize the body of the message. It
> also uses the correlation id from the message properties to deduplicate the
> message.
>
> What i want to propose is instead of taking a implementation of a
> DeserializationSchema in the RMQSource constructor, actually have the
> user implement an interface that would have methods both the output for the
> RMQSource and the correlation id used not only from the body of the message
> but also to it's metadata and properties thus giving the connector much
> more power and flexibility.
>
> This of course would mean a breaking API change for the RMQSource since it
> will no longer take a DeserializationSchema but an implementation of a
> predefined interface that has the methods to extract both the output of the
> RMQSource and the to extract the unique message id as well.
>
> The reason behind that is that in my company we were relaying on another
> property the message id for deduplication of the messages and i also needed
> that information further down the pipeline and there was absolutely no way
> of getting it other than modifying the RMQSource.
>
> I already have code written but as the rules dictates i have to run it by
> you guys first before i attempt to create a Jira ticket :)
>
> Let me know what you think.
>
> Regards,
> Karim Mansour
>


[jira] [Created] (FLINK-17471) Move LICENSE and NOTICE files to root directory of python distribution

2020-04-29 Thread Yu Li (Jira)
Yu Li created FLINK-17471:
-

 Summary: Move LICENSE and NOTICE files to root directory of python 
distribution
 Key: FLINK-17471
 URL: https://issues.apache.org/jira/browse/FLINK-17471
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.10.0, 1.9.3, 1.11.0
Reporter: Yu Li
 Fix For: 1.10.1, 1.11.0, 1.9.4


This is observed and proposed by Robert during 1.10.1 RC1 check:
{noformat}
Another question that I had while checking the release was the
"apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
distribution.
It does not contain a LICENSE and NOTICE file at the root level (which is
okay [1] for binary releases), but in the "pyflink/" directory. There is
also a "deps/" directory, which contains a full distribution of Flink,
without any license files.
I believe it would be a little bit nicer to have the LICENSE and NOTICE
file in the root directory (if the python wheels format permits) to make
sure it is obvious that all binary release contents are covered by these
files.
{noformat}

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-10-1-release-candidate-1-tp40724p40910.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-web] XBaith commented on a change in pull request #245: [FLINK-13678] Translate "Code Style - Preamble" page into Chinese

2020-04-29 Thread GitBox


XBaith commented on a change in pull request #245:
URL: https://github.com/apache/flink-web/pull/245#discussion_r417749335



##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,25 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码样式与质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+本文旨在确立我们要维护的代码样式与质量标准。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+评估代码贡献(或任何代码片段)有多种方式:一组指标是代码是否正确和高效。这需要正确地解决逻辑或算法问题。
 
-Another set of properties is whether the code follows an intuitive design and 
architecture, whether it is well structured with right separation of concerns, 
and whether the code is easily understandable and makes its assumptions 
explicit. That set of properties requires solving the _software engineering 
problem_ well. A good solution implies that the code is easily testable, 
maintainable also by other people than the original authors (because it is 
harder to accidentally break), and efficient to evolve.
+另一组指标是代码的设计和架构是否直观、结构是否良好、关注点是否正确、代码是否易于理解以及假设是否明确。这需要很好地解决软件工程问题。好的解决方案意味着代码容易测试,可以由原作者之外的其他人维护(代码不容易被意外破坏),并且可持续优化。
 
-While the first set of properties has rather objective approval criteria, the 
second set of properties is much harder to assess, but is of high importance 
for an open source project like Apache Flink. To make the code base inviting to 
many contributors, to make contributions easy to understand for developers that 
did not write the original code, and to make the code robust in the face of 
many contributions, well engineered code is crucial.[^1] For well engineered 
code, it is easier to keep it correct and fast over time.
+第一组指标具有比较客观的评价标准,第二组指标较难于评估,然而对于 Apache Flink 
这样的开源项目,第二组指标更加重要。为了能够邀请更多的贡献者,为了使非原始开发人员容易上手参与贡献,为了使大量贡献者协作开发的代码保持健壮,对代码进行精心地设计至关重要。[^1]
 随着时间的推移,精心设计的代码更容易保持正确和高效。
 
 
-This is of course not a full guide on how to write well engineered code. There 
is a world of big books that try to capture that. This guide is meant as a 
checklist of best practices, patterns, anti-patterns, and common mistakes that 
we observed in the context of developing Flink.
+本文当然不是代码设计的完全指南。有海量的书籍研究和讨论相关课题。本指南旨在作为一份清单,列举出我们在开发 Flink 
过程中所观察到的最佳实践、模式、反模式和常见错误。
 
-A big part of high-quality open source contributions is about helping the 
reviewer to understand the contribution and double-check the implications, so 
an important part of this guide is about how to structure a pull request for 
review.
+高质量开源贡献的很大一部分是帮助审阅者理解贡献的内容进而对内容进行细致地检查,因此本指南的一个重要部分是如何构建便于代码审查的拉取请求。

Review comment:
   我觉得可以不用翻译,这应该算是github专门的词





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] klion26 commented on pull request #245: [FLINK-13678] Translate "Code Style - Preamble" page into Chinese

2020-04-29 Thread GitBox


klion26 commented on pull request #245:
URL: https://github.com/apache/flink-web/pull/245#issuecomment-621605899


   Seems the original author's account has been deleted. maybe someone else can 
take over this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] klion26 commented on a change in pull request #245: [FLINK-13678] Translate "Code Style - Preamble" page into Chinese

2020-04-29 Thread GitBox


klion26 commented on a change in pull request #245:
URL: https://github.com/apache/flink-web/pull/245#discussion_r417746431



##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,25 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码样式与质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+本文旨在确立我们要维护的代码样式与质量标准。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+评估代码贡献(或任何代码片段)有多种方式:一组指标是代码是否正确和高效。这需要正确地解决逻辑或算法问题。
 
-Another set of properties is whether the code follows an intuitive design and 
architecture, whether it is well structured with right separation of concerns, 
and whether the code is easily understandable and makes its assumptions 
explicit. That set of properties requires solving the _software engineering 
problem_ well. A good solution implies that the code is easily testable, 
maintainable also by other people than the original authors (because it is 
harder to accidentally break), and efficient to evolve.
+另一组指标是代码的设计和架构是否直观、结构是否良好、关注点是否正确、代码是否易于理解以及假设是否明确。这需要很好地解决软件工程问题。好的解决方案意味着代码容易测试,可以由原作者之外的其他人维护(代码不容易被意外破坏),并且可持续优化。
 
-While the first set of properties has rather objective approval criteria, the 
second set of properties is much harder to assess, but is of high importance 
for an open source project like Apache Flink. To make the code base inviting to 
many contributors, to make contributions easy to understand for developers that 
did not write the original code, and to make the code robust in the face of 
many contributions, well engineered code is crucial.[^1] For well engineered 
code, it is easier to keep it correct and fast over time.
+第一组指标具有比较客观的评价标准,第二组指标较难于评估,然而对于 Apache Flink 
这样的开源项目,第二组指标更加重要。为了能够邀请更多的贡献者,为了使非原始开发人员容易上手参与贡献,为了使大量贡献者协作开发的代码保持健壮,对代码进行精心地设计至关重要。[^1]
 随着时间的推移,精心设计的代码更容易保持正确和高效。
 
 
-This is of course not a full guide on how to write well engineered code. There 
is a world of big books that try to capture that. This guide is meant as a 
checklist of best practices, patterns, anti-patterns, and common mistakes that 
we observed in the context of developing Flink.
+本文当然不是代码设计的完全指南。有海量的书籍研究和讨论相关课题。本指南旨在作为一份清单,列举出我们在开发 Flink 
过程中所观察到的最佳实践、模式、反模式和常见错误。
 
-A big part of high-quality open source contributions is about helping the 
reviewer to understand the contribution and double-check the implications, so 
an important part of this guide is about how to structure a pull request for 
review.
+高质量开源贡献的很大一部分是帮助审阅者理解贡献的内容进而对内容进行细致地检查,因此本指南的一个重要部分是如何构建便于代码审查的拉取请求。

Review comment:
   “pull request” 翻译成 “拉取” 感觉怪怪的,这个地方有其他更好的翻译吗?

##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,25 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码样式与质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+本文旨在确立我们要维护的代码样式与质量标准。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+评估代码贡献(或任何代码片段)有多种方式:一组指标是代码是否正确和高效。这需要正确地解决逻辑或算法问题。
 
-Another set of properties is whether the code follows an intuitive design and 
architecture, whether it is well structured with right separation of concerns, 
and whether the code is easily understandable and makes its assumptions 
explicit. That set of properties requires solving the _software engineering 
problem_ well. A good solution implies that the code is easily testable, 
maintainable also by other people than the original authors (because it is 
harder to accidentally break), and efficient to evolve.
+另一组指标是代码的设计和架构是否直观、结构是否良好、关注点是否正确、代码是否易于理解以及假设是否明确。这需要很好地解决软件工程问题。好的解决方案意味着代码容易测试,可以由原作者之外的其他人维护(代码不容易被意外破坏),并且可持续优化。

Review comment:
   `关注点是否正确` 这个感觉有点奇怪

##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,25 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码样式与质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+本文旨在确立我们要维护的代码样式与质量标准。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+评估代码贡献(或任何代码片段)有多种方式:一组指标是代码是否正确和高效。这需要正确地解决逻辑或算法问题。
 
-Another set of properties is whether the code follows an intuitive design and 
architecture, whether it is well structured 

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Yu Li
Thanks for all the efforts checking the license.

The vote is hereby canceled, will prepare the next RC after license issues
resolved.

Best Regards,
Yu


On Wed, 29 Apr 2020 at 23:29, Jark Wu  wrote:

> Looks like the ES NOTICE problem is a long-standing problem, because the
> ES6 sql connector NOTICE also misses these dependencies.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 17:26, Robert Metzger  wrote:
>
> > Thanks for taking a look Chesnay. Then let me officially cancel the
> > release:
> >
> > -1 (binding)
> >
> >
> > Another question that I had while checking the release was the
> > "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> > distribution.
> > It does not contain a LICENSE and NOTICE file at the root level (which is
> > okay [1] for binary releases), but in the "pyflink/" directory. There is
> > also a "deps/" directory, which contains a full distribution of Flink,
> > without any license files.
> > I believe it would be a little bit nicer to have the LICENSE and NOTICE
> > file in the root directory (if the python wheels format permits) to make
> > sure it is obvious that all binary release contents are covered by these
> > files.
> >
> >
> > [1]
> > http://www.apache.org/legal/release-policy.html#licensing-documentation
> >
> >
> >
> >
> > On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> > wrote:
> >
> > > Thanks a lot for creating a release candidate for 1.10.1!
> > >
> > > +1 from my side
> > >
> > > checked
> > > - md5/gpg, ok
> > > - source does not contain any binaries, ok
> > > - pom points to the same version 1.10.1, ok
> > > - README file does not contain anything unexpected, ok
> > > - maven clean package -DskipTests, ok
> > > - maven clean verify, encounter a test timeout exception, but I think
> it
> > > does not block the RC(have created an issue[1] to track it),
> > > - run demos on a stand-alone cluster, ok
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-17458
> > > Best,
> > > Congxian
> > >
> > >
> > > Robert Metzger  于2020年4月29日周三 下午2:54写道:
> > >
> > > > Thanks a lot for creating a release candidate for 1.10.1!
> > > >
> > > > I'm not sure, but I think found a potential issue in the release
> while
> > > > checking dependency changes on the ElasticSearch7 connector:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > > >
> > > > In this change, "com.carrotsearch:hppc" has been added to the shaded
> > jar
> > > (
> > > >
> > > >
> > >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> > > > ),
> > > > without including proper mention of that dependency in
> > "META-INF/NOTICE".
> > > >
> > > >
> > > > My checking notes:
> > > >
> > > > - checked the diff for dependency changes:
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> > > > (w/o
> > > > <
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> > > >
> > > > release commit:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> > > > )
> > > >   - flink-connector-hive sets the derby version for test-scoped
> > > > dependencies:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> > > >  - no NOTICE file found, but this module does not forward
> binaries.
> > > >   - kafka 0.10 minor version upgrade:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
> > > >   - NOTICE change found
> > > >- ES7 changes shading:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > > >  - problem found
> > > >   - Influxdb version change
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
> > > >  - NOTICE change found
> > > >
> > > >
> > > >
> > > > On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Please review and vote on the release candidate #1 for version
> > 1.10.1,
> > > as
> > > > > follows:
> > > > > [ ] +1, Approve the release
> > > > > [ ] -1, Do not approve the release (please provide specific
> comments)
> > > > >
> > > > >
> > > > > The complete staging area is available for your review, which
> > includes:
> > > > > * JIRA release notes [1],
> > > > > * the official Apache source release and binary convenience
> releases
> > to
> > > > be
> > > > > deployed to dist.apache.org [2], which are signed with the key
> with
> > > > > fingerprint 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jingsong Li
Sorry for mistake,

I proposal:

connector: 'filesystem'
path: '...'
format: 'json'
format.option:
option1: '...'
option2: '...'
option3: '...'

And I think most of cases, users just need configure 'format' key, we
should make it convenient for them. There is no big problem in making
format options more complex.

And for Kafka key and value, we can:

connector: 'kafka'
key.format: 'json'
key.format.option:
option1: '...'
option2: '...'
value.format: 'json'
value.format.option:
option1: '...'
option2: '...'

Best,
Jingsong Lee

On Thu, Apr 30, 2020 at 10:16 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
 > > agreed on the following DDL syntax:
 > >
 > > CREATE TABLE fs_table (
 > >  ...
 > > ) WITH (

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing
connector properties to connector developers.
So frankly speaking, no matter which style we choose, there is no strong
guarantee for either of these. So it's also possible
that developers can choose a totally different way to express properties,
such as:

'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'

which also seems quite straightforward and easy to use. So my opinion on
this would be since there is no guarantee for developers
to choose "format" as common prefix of all format related properties, there
is not much value to extend 'format' to 'format.kind'.


Best,
Kurt


On Thu, Apr 30, 2020 at 10:17 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Forward Xu
Here I have a little doubt. At present, our json only supports the
conventional json format. If we need to implement json with bson, json with
avro, etc., how should we express it?
Do you need like the following:

‘format.name' = 'json',

‘format.json.fail-on-missing-field' = 'false'


‘format.name' = 'bson',

‘format.bson.fail-on-missing-field' = ‘false'


Best,

Forward

Benchao Li  于2020年4月30日周四 上午9:58写道:

> Thanks Timo for staring the discussion.
>
> Generally I like the idea to keep the config align with a standard like
> json/yaml.
>
> From the user's perspective, I don't use table configs from a config file
> like yaml or json for now,
> And it's ok to change it to yaml like style. Actually we didn't know that
> this could be a yaml like
> configuration hierarchy. If it has a hierarchy, we maybe consider that in
> the future to load the
> config from a yaml/json file.
>
> Regarding the name,
> 'format.kind' looks fine to me. However there is another name from the top
> of my head:
> 'format.name', WDYT?
>
> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>
> > Hi all,
> >
> > I also wanted to share my opinion.
> >
> > When talking about a ConfigOption hierarchy we use for configuring Flink
> > cluster I would be a strong advocate for keeping a yaml/hocon/json/...
> > compatible style. Those options are primarily read from a file and thus
> > should at least try to follow common practices for nested formats if we
> > ever decide to switch to one.
> >
> > Here the question is about the properties we use in SQL statements. The
> > origin/destination of these usually will be external catalog, usually in
> a
> > flattened(key/value) representation so I agree it is not as important as
> in
> > the aforementioned case. Nevertheless having a yaml based catalog or
> being
> > able to have e.g. yaml based snapshots of a catalog in my opinion is
> > appealing. At the same time cost of being able to have a nice
> > yaml/hocon/json representation is just adding a single suffix to a
> > single(at most 2 key + value) property. The question is between `format`
> =
> > `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
> > doing it.
> >
> > Just to have a full picture. Both cases can be represented in yaml, but
> > the difference is significant:
> > format: 'json'
> > format.option: 'value'
> >
> > vs
> > format:
> > kind: 'json'
> >
> > option: 'value'
> >
> > Best,
> > Dawid
> >
> > On 29/04/2020 17:13, Flavio Pompermaier wrote:
> >
> > Personally I don't have any preference here.  Compliance wih standard
> YAML
> > parser is probably more important
> >
> > On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
> >
> >> From a user's perspective, I prefer the shorter one "format=json",
> because
> >> it's more concise and straightforward. The "kind" is redundant for
> users.
> >> Is there a real case requires to represent the configuration in JSON
> >> style?
> >> As far as I can see, I don't see such requirement, and everything works
> >> fine by now.
> >>
> >> So I'm in favor of "format=json". But if the community insist to follow
> >> code style on this, I'm also fine with the longer one.
> >>
> >> Btw, I also CC user mailing list to listen more user's feedback.
> Because I
> >> think this is relative to usability.
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
> >> wrote:
> >>
> >> >  > Therefore, should we advocate instead:
> >> >  >
> >> >  > 'format.kind' = 'json',
> >> >  > 'format.fail-on-missing-field' = 'false'
> >> >
> >> > Yes. That's pretty much it.
> >> >
> >> > This is reasonable important to nail down as with such violations I
> >> > believe we could not actually switch to a standard YAML parser.
> >> >
> >> > On 29/04/2020 16:05, Timo Walther wrote:
> >> > > Hi everyone,
> >> > >
> >> > > discussions around ConfigOption seem to be very popular recently.
> So I
> >> > > would also like to get some opinions on a different topic.
> >> > >
> >> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> >> > > agreed on the following DDL syntax:
> >> > >
> >> > > CREATE TABLE fs_table (
> >> > >  ...
> >> > > ) WITH (
> >> > >  'connector' = 'filesystem',
> >> > >  'path' = 'file:///path/to/whatever',
> >> > >  'format' = 'csv',
> >> > >  'format.allow-comments' = 'true',
> >> > >  'format.ignore-parse-errors' = 'true'
> >> > > );
> >> > >
> >> > > Of course this is slightly different from regular Flink core
> >> > > configuration but a connector still needs to be configured based on
> >> > > these options.
> >> > >
> >> > > However, I think this FLIP violates our code style guidelines
> because
> >> > >
> >> > > 'format' = 'json',
> >> > > 'format.fail-on-missing-field' = 'false'
> >> > >
> >> > > is an invalid hierarchy. `format` cannot be a string and a top-level
> >> > > object at the same time.
> >> > >
> >> > > We have similar problems in our runtime configuration:
> >> > >
> >> > > state.backend=
> >> > > 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jingsong Li
Thanks Timo for staring the discussion.

I am +1 for "format: 'json'".
Take a look to Dawid's yaml case:

connector: 'filesystem'
path: '...'
format: 'json'
format:
option1: '...'
option2: '...'
option3: '...'

Is this work?
According to my understanding, 'format' key is the attribute of connector,
which can be separately configured outside. In the 'format' block, they are
the attribute of format.
So this json style block can only contain the properties exclude format
itself.

Best,
Jingsong Lee

On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:

> Thanks Timo for staring the discussion.
>
> Generally I like the idea to keep the config align with a standard like
> json/yaml.
>
> From the user's perspective, I don't use table configs from a config file
> like yaml or json for now,
> And it's ok to change it to yaml like style. Actually we didn't know that
> this could be a yaml like
> configuration hierarchy. If it has a hierarchy, we maybe consider that in
> the future to load the
> config from a yaml/json file.
>
> Regarding the name,
> 'format.kind' looks fine to me. However there is another name from the top
> of my head:
> 'format.name', WDYT?
>
> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>
>> Hi all,
>>
>> I also wanted to share my opinion.
>>
>> When talking about a ConfigOption hierarchy we use for configuring Flink
>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>> compatible style. Those options are primarily read from a file and thus
>> should at least try to follow common practices for nested formats if we
>> ever decide to switch to one.
>>
>> Here the question is about the properties we use in SQL statements. The
>> origin/destination of these usually will be external catalog, usually in a
>> flattened(key/value) representation so I agree it is not as important as in
>> the aforementioned case. Nevertheless having a yaml based catalog or being
>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>> appealing. At the same time cost of being able to have a nice
>> yaml/hocon/json representation is just adding a single suffix to a
>> single(at most 2 key + value) property. The question is between `format` =
>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>> doing it.
>>
>> Just to have a full picture. Both cases can be represented in yaml, but
>> the difference is significant:
>> format: 'json'
>> format.option: 'value'
>>
>> vs
>> format:
>> kind: 'json'
>>
>> option: 'value'
>>
>> Best,
>> Dawid
>>
>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>
>> Personally I don't have any preference here.  Compliance wih standard
>> YAML parser is probably more important
>>
>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>
>>> From a user's perspective, I prefer the shorter one "format=json",
>>> because
>>> it's more concise and straightforward. The "kind" is redundant for users.
>>> Is there a real case requires to represent the configuration in JSON
>>> style?
>>> As far as I can see, I don't see such requirement, and everything works
>>> fine by now.
>>>
>>> So I'm in favor of "format=json". But if the community insist to follow
>>> code style on this, I'm also fine with the longer one.
>>>
>>> Btw, I also CC user mailing list to listen more user's feedback. Because
>>> I
>>> think this is relative to usability.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
>>> wrote:
>>>
>>> >  > Therefore, should we advocate instead:
>>> >  >
>>> >  > 'format.kind' = 'json',
>>> >  > 'format.fail-on-missing-field' = 'false'
>>> >
>>> > Yes. That's pretty much it.
>>> >
>>> > This is reasonable important to nail down as with such violations I
>>> > believe we could not actually switch to a standard YAML parser.
>>> >
>>> > On 29/04/2020 16:05, Timo Walther wrote:
>>> > > Hi everyone,
>>> > >
>>> > > discussions around ConfigOption seem to be very popular recently. So
>>> I
>>> > > would also like to get some opinions on a different topic.
>>> > >
>>> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
>>> > > agreed on the following DDL syntax:
>>> > >
>>> > > CREATE TABLE fs_table (
>>> > >  ...
>>> > > ) WITH (
>>> > >  'connector' = 'filesystem',
>>> > >  'path' = 'file:///path/to/whatever',
>>> > >  'format' = 'csv',
>>> > >  'format.allow-comments' = 'true',
>>> > >  'format.ignore-parse-errors' = 'true'
>>> > > );
>>> > >
>>> > > Of course this is slightly different from regular Flink core
>>> > > configuration but a connector still needs to be configured based on
>>> > > these options.
>>> > >
>>> > > However, I think this FLIP violates our code style guidelines because
>>> > >
>>> > > 'format' = 'json',
>>> > > 'format.fail-on-missing-field' = 'false'
>>> > >
>>> > > is an invalid hierarchy. `format` cannot be a string and a top-level
>>> > > object at the same time.
>>> > >
>>> > > We have similar problems in our runtime configuration:
>>> > >
>>> > > 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Benchao Li
Thanks Timo for staring the discussion.

Generally I like the idea to keep the config align with a standard like
json/yaml.

>From the user's perspective, I don't use table configs from a config file
like yaml or json for now,
And it's ok to change it to yaml like style. Actually we didn't know that
this could be a yaml like
configuration hierarchy. If it has a hierarchy, we maybe consider that in
the future to load the
config from a yaml/json file.

Regarding the name,
'format.kind' looks fine to me. However there is another name from the top
of my head:
'format.name', WDYT?

Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:

> Hi all,
>
> I also wanted to share my opinion.
>
> When talking about a ConfigOption hierarchy we use for configuring Flink
> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
> compatible style. Those options are primarily read from a file and thus
> should at least try to follow common practices for nested formats if we
> ever decide to switch to one.
>
> Here the question is about the properties we use in SQL statements. The
> origin/destination of these usually will be external catalog, usually in a
> flattened(key/value) representation so I agree it is not as important as in
> the aforementioned case. Nevertheless having a yaml based catalog or being
> able to have e.g. yaml based snapshots of a catalog in my opinion is
> appealing. At the same time cost of being able to have a nice
> yaml/hocon/json representation is just adding a single suffix to a
> single(at most 2 key + value) property. The question is between `format` =
> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
> doing it.
>
> Just to have a full picture. Both cases can be represented in yaml, but
> the difference is significant:
> format: 'json'
> format.option: 'value'
>
> vs
> format:
> kind: 'json'
>
> option: 'value'
>
> Best,
> Dawid
>
> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>
> Personally I don't have any preference here.  Compliance wih standard YAML
> parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>
>> From a user's perspective, I prefer the shorter one "format=json", because
>> it's more concise and straightforward. The "kind" is redundant for users.
>> Is there a real case requires to represent the configuration in JSON
>> style?
>> As far as I can see, I don't see such requirement, and everything works
>> fine by now.
>>
>> So I'm in favor of "format=json". But if the community insist to follow
>> code style on this, I'm also fine with the longer one.
>>
>> Btw, I also CC user mailing list to listen more user's feedback. Because I
>> think this is relative to usability.
>>
>> Best,
>> Jark
>>
>> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
>> wrote:
>>
>> >  > Therefore, should we advocate instead:
>> >  >
>> >  > 'format.kind' = 'json',
>> >  > 'format.fail-on-missing-field' = 'false'
>> >
>> > Yes. That's pretty much it.
>> >
>> > This is reasonable important to nail down as with such violations I
>> > believe we could not actually switch to a standard YAML parser.
>> >
>> > On 29/04/2020 16:05, Timo Walther wrote:
>> > > Hi everyone,
>> > >
>> > > discussions around ConfigOption seem to be very popular recently. So I
>> > > would also like to get some opinions on a different topic.
>> > >
>> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
>> > > agreed on the following DDL syntax:
>> > >
>> > > CREATE TABLE fs_table (
>> > >  ...
>> > > ) WITH (
>> > >  'connector' = 'filesystem',
>> > >  'path' = 'file:///path/to/whatever',
>> > >  'format' = 'csv',
>> > >  'format.allow-comments' = 'true',
>> > >  'format.ignore-parse-errors' = 'true'
>> > > );
>> > >
>> > > Of course this is slightly different from regular Flink core
>> > > configuration but a connector still needs to be configured based on
>> > > these options.
>> > >
>> > > However, I think this FLIP violates our code style guidelines because
>> > >
>> > > 'format' = 'json',
>> > > 'format.fail-on-missing-field' = 'false'
>> > >
>> > > is an invalid hierarchy. `format` cannot be a string and a top-level
>> > > object at the same time.
>> > >
>> > > We have similar problems in our runtime configuration:
>> > >
>> > > state.backend=
>> > > state.backend.incremental=
>> > > restart-strategy=
>> > > restart-strategy.fixed-delay.delay=
>> > > high-availability=
>> > > high-availability.cluster-id=
>> > >
>> > > The code style guide states "Think of the configuration as nested
>> > > objects (JSON style)". So such hierarchies cannot be represented in a
>> > > nested JSON style.
>> > >
>> > > Therefore, should we advocate instead:
>> > >
>> > > 'format.kind' = 'json',
>> > > 'format.fail-on-missing-field' = 'false'
>> > >
>> > > What do you think?
>> > >
>> > > Thanks,
>> > > Timo
>> > >
>> > > [1]
>> > >
>> >
>> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>> > >
>> >
>> >
>
>

-- 

[GitHub] [flink-shaded] piyushnarang commented on pull request #85: [FLINK-16955] Bump Zookeeper 3.4.X to 3.4.14

2020-04-29 Thread GitBox


piyushnarang commented on pull request #85:
URL: https://github.com/apache/flink-shaded/pull/85#issuecomment-621507875


   cc @zentol - I tried using this version on Flink and I hit the issues that 
were captured in https://issues.apache.org/jira/browse/FLINK-11259
   We need to make a minor tweak to the `SecureTestEnvironment.prepare(..)` to 
get around this - 
https://github.com/piyushnarang/flink/commit/fd6cc0311d0ef20606ba4566a54315a186889304



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Re: Integration of DataSketches into Flink

2020-04-29 Thread leerho
Seth,
Thanks for the enthusiastic reply.

However, I have some questions ... and concerns :)

1) Create a page on the flink packages website.


I looked at this website and it raises a number of red flags for me:

   - There is no instructions anywhere on the site on how to add a listing.
   - The "Login with Github" raises security concerns and without any
   explanation:
  - Why would I want or need to authorize this site to have "access to
  my email account"!  Whoa!
  - This site has registered fewer than 100 GitHub users.  That is a
  very small number. It seems a lot of GitHub users have the same concerns
  that I have.
   - The packages listed are "not endorsed by Apache Flink project or
   Ververica.  This site is not affiliated with or released by Apache Flink".
   There is no verification of licensing.
   - In other words, this site carries zero or even negative weight.  Why
   would I want to add a listing for our very high quality and properly
   licensed Apache DataSketches product alongside other listings that are
   possibly junk?


2) Implement Type Information for DataSketches


In terms of serialization and deserialization, the sketches in our library
have their own serialization: to and from a byte array, which is also
language independent across Java, C++ and Python.  How to transport bytes
from one system to another is system dependent and external to the
DataSketches library.  Some systems use Base64, or ProtoBuf, or Kryo, or
Kafka, or whatever.  As long as we can deserialize (or wrap) the same byte
array that was serialized we are fine.

If you are asking for metadata about a specific blob of bytes, such as
which sketch created the blob of bytes, we can perhaps do that, but the
documentation is not clear about how much metadata is really required,
because our library does not need it.  So we could use some help here in
defining what is really required.  Be aware that metadata also increases
the storage for an object, and we have worked very hard to keep the stored
size of our sketches very small, because that is one of the key advantages
of using sketches.  This is also why we don't use Java serialization, it is
way too heavy!

3) Implementing Sketch UDFs


Thanks for the references, but this was getting way too deep into the weeds
for me right now.  I would suggest we start simple and then build these
UDF's later, as they seem optional, if I understand your comments correctly.

I would suggest we set up a video call with a couple of your key developers
that could steer us quickly through the options.

Please be aware that we are *extremely* resource limited, Flink is at least
10 times our size, so we could use some help in getting started.  What
would be ideal would be for someone in your community that is interested in
seeing DataSketches integrated into Flink work with us on making it
happen.

I am looking forward to working with Flink to make this happen.

Cheers,

Lee.


On Mon, Apr 27, 2020 at 2:15 PM Seth Wiesman  wrote:

> One more point I forgot to mention.
>
> Flink SQL supports Hive UDF's[1]. I haven't tested it, but the datasketch
> hive package should just work out of the box.
>
> Seth
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_functions.html
>
> On Mon, Apr 27, 2020 at 2:27 PM Seth Wiesman  wrote:
>
> > Hi Lee,
> >
> > I really like this project, I used it with Flink a few years ago when it
> > was still Yahoo DataSketches. The projects clearly complement each other.
> > As Arvid mentioned, the Flink community is trying to foster an ecosystem
> > larger than what is in the main Flink repository. The reason is that the
> > project has grown to such a scale that it cannot reasonably maintain
> > everything. To encourage that sort of growth, Flink is extensively
> > pluggable which means that components do not need to live within the main
> > repository to be treated first-class.
> >
> > I'd like to outline somethings the DataSketch community could do to
> > integrate with Flink.
> >
> > 1) Create a page on the flink packages website.
> >
> > The flink community hosts a website call flink packages to increase the
> > visibility of ecosystem projects with the flink user base[1].
> Datasketches
> > are usable from Flink today so I'd encourage you to create a page right
> > away.
> >
> > 2) Implement TypeInformation for DataSketches
> >
> > TypeInformation is Flink's internal type system and is used as a factory
> > for creating serializing for different types. These serializers are what
> > Flink uses when shuffling data around the cluster and when storing
> records
> > in state backends as state. Providing type information instances for the
> > different sketch types, which would just be wrappers around existing
> > serializers in the data sketch codebase. This should be relatively
> > straightforward. There is no DataStream aggregation API in the way you
> are
> > describing so this is the *only* step 

doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Hi ,

We have a use case where we have to demultiplex the incoming stream to
multiple output streams.

We read from 1 Kafka topic and as an output we generate multiple Kafka
topics. The logic of generating each new Kafka topic is different and not
known beforehand. Users of the system keep adding new logic and henceforth
the system needs to generate the data in the new topic with logic applied
to the incoming stream.

 Input to the system would be logic code or SQL statement and destination
topic or S3 location. The system should be able to read this configuration
and emit those, hopefully at runtime.

Any guidance if this is possible in flink . and some pointers how this can
be achieved.

regards,
Dhuranda


doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
>
> Hi ,
>
> We have a use case where we have to demultiplex the incoming stream to
> multiple output streams.
>
> We read from 1 Kafka topic and as an output we generate multiple Kafka
> topics. The logic of generating each new Kafka topic is different and not
> known beforehand. Users of the system keep adding new logic and henceforth
> the system needs to generate the data in the new topic with logic applied
> to the incoming stream.
>
>  Input to the system would be logic code or SQL statement and destination
> topic or S3 location. The system should be able to read this configuration
> and emit those, hopefully at runtime.
>
> Any guidance if this is possible in flink . and some pointers how this can
> be achieved.
>
> regards,
> Dhurandar
>


[GitHub] [flink-shaded] piyushnarang opened a new pull request #85: [FLINK-16955] Bump Zookeeper 3.4.X to 3.4.14

2020-04-29 Thread GitBox


piyushnarang opened a new pull request #85:
URL: https://github.com/apache/flink-shaded/pull/85


   Follow up from https://github.com/apache/flink/pull/11938 to commit in the 
right project. 
   Picking up the updated zk dependency allows us to get around an issue in the 
`StaticHostProvider` in the current zk where if 1 of n configured zk hosts is 
unreachable the Flink job manager is not able to start up. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17470) Flink task executor process permanently hangs on `flink-daemon.sh stop`, deletes PID file

2020-04-29 Thread Hunter Herman (Jira)
Hunter Herman created FLINK-17470:
-

 Summary: Flink task executor process permanently hangs on 
`flink-daemon.sh stop`, deletes PID file
 Key: FLINK-17470
 URL: https://issues.apache.org/jira/browse/FLINK-17470
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.10.0
 Environment:  
{code:java}
$ uname -a
Linux hostname.local 3.10.0-1062.9.1.el7.x86_64 #1 SMP Fri Dec 6 15:49:49 UTC 
2019 x86_64 x86_64 x86_64 GNU/Linux
$ lsb_release -a
LSB Version::core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description:CentOS Linux release 7.7.1908 (Core)
Release:7.7.1908
Codename:   Core
{code}

Flink version 1.10
 
Reporter: Hunter Herman
 Attachments: flink_jstack.log, flink_mixed_jstack.log

Hi Flink team!

We've attempted to upgrade our flink 1.9 cluster to 1.10, but are experiencing 
reproducible instability on shutdown. Speciically, it appears that the `kill` 
issued in the `stop` case of flink-daemon.sh is causing the task executor 
process to hang permanently. Specifically, the process seems to be hanging in 
the `org.apache.flink.runtime.util.JvmShutdownSafeguard$DelayedTerminator.run` 
in a `Thread.sleep()` call. I think this is a bizarre behavior. Also note that 
every thread in the process is BLOCKED. on a `pthread_cond_wait` call. Is this 
an OS level issue? Banging my head on a wall here. See attached stack traces 
for details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17469) Support override of DEFAULT_JOB_NAME with system property for StreamExecutionEnvironment

2020-04-29 Thread John Lonergan (Jira)
John Lonergan created FLINK-17469:
-

 Summary: Support override of DEFAULT_JOB_NAME with system property 
for StreamExecutionEnvironment
 Key: FLINK-17469
 URL: https://issues.apache.org/jira/browse/FLINK-17469
 Project: Flink
  Issue Type: New Feature
  Components: API / Core
Affects Versions: 1.10.0
Reporter: John Lonergan


We want to be able to provide the job name using a standard system property 
"job.name".

We can of course write application code in each job to achieve this by passing 
the system property value ourselves to the execute method.

However, there exists already default job name in 
StreamExecutionEnvironment.DEFAULT_JOB_NAME.

Our proposed changed to add a method to StreamExecutionEnvironment...

{code:java}

String getDefaultJobName() {
  return System.getProperty("job.name", 
StreamExecutionEnvironment.DEFAULT_JOB_NAME);
}
{code}


.. and call that method rather than directly accessing 
StreamExecutionEnvironment.DEFAULT_JOB_NAME 

This change is backwards compatible.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Hi ,

We have a use case where we have to demultiplex the incoming stream to
multiple output streams.

We read from 1 Kafka topic and as an output we generate multiple Kafka
topics. The logic of generating each new Kafka topic is different and not
known beforehand. Users of the system keep adding new logic and henceforth
the system needs to generate the data in the new topic with logic applied
to the incoming stream.

 Input to the system would be logic code or SQL statement and destination
topic or S3 location. The system should be able to read this configuration
and emit those, hopefully at runtime.

Any guidance if this is possible in flink . and some pointers how this can
be achieved.

regards,
Dhurandar


Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

2020-04-29 Thread Aljoscha Krettek
Regarding the WatermarkGenerator (WG) interface itself. The proposal is 
basically to turn emitting into a "flatMap", we give the 
WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can 
decide whether to output a watermark or not and can also mark the output 
as idle. Changing the interface to return a Watermark (as the previous 
watermark assigner interface did) would not allow that flexibility.


Regarding checkpointing the watermark and keeping track of the minimum 
watermark, this would be the responsibility of the framework (or the 
KafkaConsumer in the current implementation). The user-supplied WG does 
not need to make sure the watermark doesn't regress.


Regarding making the WG a "rich function", I can see the potential 
benefit but I also see a lot of pitfalls. For example, how should the 
watermark state be handled in the case of scale-in? It could be made to 
work in the Kafka case by attaching the state to the partition state 
that we keep, but then we have potential backwards compatibility 
problems also for the WM state. Does the WG usually need to keep the 
state or might it be enough if the state is transient, i.e. if you have 
a restart the WG would loose its histogram but it would rebuild it 
quickly and you would get back to the same steady state as before.


Best,
Aljoscha

On 27.04.20 12:12, David Anderson wrote:

Overall I like this proposal; thanks for bringing it forward, Aljoscha.

I also like the idea of making the Watermark generator a rich function --
this should make it more straightforward to implement smarter watermark
generators. Eg, one that uses state to keep statistics about the actual
out-of-orderness, and uses those statistics to implement a variable delay.

David

On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas  wrote:


Hi Aljoscha,

Thanks for opening the discussion!

I have two comments on the FLIP:
1) we could add lifecycle methods to the Generator, i.e. open()/
close(), probably with a Context as argument: I have not fully thought
this through but I think that this is more aligned with the rest of
our rich functions. In addition, it will allow, for example, to
initialize the Watermark value, if we decide to checkpoint the
watermark (see [1]) (I also do not know if Table/SQL needs to do
anything in the open()).
2) aligned with the above, and with the case where we want to
checkpoint the watermark in mind, I am wondering about how we could
implement this in the future. In the FLIP, it is proposed to expose
the WatermarkOutput in the methods of the WatermarkGenerator. Given
that there is the implicit contract that watermarks are
non-decreasing, the WatermarkOutput#emitWatermark() will have (I
assume) a check that will compare the last emitted WM against the
provided one, and emit it only if it is >=. If not, then we risk
having the user shooting himself on the foot if he/she accidentally
forgets the check. Given that the WatermarkGenerator and its caller do
not know if the watermark was finally emitted or not (the
WatermarkOutput#emitWatermark returns void), who will be responsible
for checkpointing the WM?

Given this, why not having the methods as:

public interface WatermarkGenerator {

 Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
output);

 Watermark onPeriodicEmit(WatermarkOutput output);
}

and the caller will be the one enforcing any invariants, such as
non-decreasing watermarks. In this way, the caller can checkpoint
anything that is needed as it will have complete knowledge as to if
the WM was emitted or not.

What do you think?

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-5601

On Tue, Apr 21, 2020 at 2:25 PM Timo Walther  wrote:


Thanks for the proposal Aljoscha. This is a very useful unification. We
have considered this FLIP already in the interfaces for FLIP-95 [1] and
look forward to update to the new unified watermark generators once
FLIP-126 has been accepted.

Regards,
Timo

[1] https://github.com/apache/flink/pull/11692

On 20.04.20 18:10, Aljoscha Krettek wrote:

Hi Everyone!

We would like to start a discussion on "FLIP-126: Unify (and separate)
Watermark Assigners" [1]. This work was started by Stephan in an
experimental branch. I expanded on that work to provide a PoC for the
changes proposed in this FLIP: [2].

Currently, we have two different flavours of Watermark
Assigners: AssignerWithPunctuatedWatermarks
and AssignerWithPeriodicWatermarks. Both of them extend
from TimestampAssigner. This means that sources that want to support
watermark assignment/extraction in the source need to support two
separate interfaces, we have two operator implementations for the
different flavours. Also, this makes features such as generic support
for idleness detection more complicated to implemented because we again
have to support two types of watermark assigners.

In this FLIP we propose two things:

Unify the Watermark Assigners into one Interface WatermarkGenerator
Separate this new 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Dawid Wysakowicz
Hi all,

I also wanted to share my opinion.

When talking about a ConfigOption hierarchy we use for configuring Flink
cluster I would be a strong advocate for keeping a yaml/hocon/json/...
compatible style. Those options are primarily read from a file and thus
should at least try to follow common practices for nested formats if we
ever decide to switch to one.

Here the question is about the properties we use in SQL statements. The
origin/destination of these usually will be external catalog, usually in
a flattened(key/value) representation so I agree it is not as important
as in the aforementioned case. Nevertheless having a yaml based catalog
or being able to have e.g. yaml based snapshots of a catalog in my
opinion is appealing. At the same time cost of being able to have a nice
yaml/hocon/json representation is just adding a single suffix to a
single(at most 2 key + value) property. The question is between `format`
= `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
doing it.

Just to have a full picture. Both cases can be represented in yaml, but
the difference is significant:

format: 'json'
format.option: 'value'

vs

format:
    kind: 'json'

    option: 'value'


Best,
Dawid

On 29/04/2020 17:13, Flavio Pompermaier wrote:
> Personally I don't have any preference here.  Compliance wih standard
> YAML parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  > wrote:
>
> From a user's perspective, I prefer the shorter one "format=json",
> because
> it's more concise and straightforward. The "kind" is redundant for
> users.
> Is there a real case requires to represent the configuration in
> JSON style?
> As far as I can see, I don't see such requirement, and everything
> works
> fine by now.
>
> So I'm in favor of "format=json". But if the community insist to
> follow
> code style on this, I'm also fine with the longer one.
>
> Btw, I also CC user mailing list to listen more user's feedback.
> Because I
> think this is relative to usability.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  > wrote:
>
> >  > Therefore, should we advocate instead:
> >  >
> >  > 'format.kind' = 'json',
> >  > 'format.fail-on-missing-field' = 'false'
> >
> > Yes. That's pretty much it.
> >
> > This is reasonable important to nail down as with such violations I
> > believe we could not actually switch to a standard YAML parser.
> >
> > On 29/04/2020 16:05, Timo Walther wrote:
> > > Hi everyone,
> > >
> > > discussions around ConfigOption seem to be very popular
> recently. So I
> > > would also like to get some opinions on a different topic.
> > >
> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > > agreed on the following DDL syntax:
> > >
> > > CREATE TABLE fs_table (
> > >  ...
> > > ) WITH (
> > >  'connector' = 'filesystem',
> > >  'path' = 'file:///path/to/whatever',
> > >  'format' = 'csv',
> > >  'format.allow-comments' = 'true',
> > >  'format.ignore-parse-errors' = 'true'
> > > );
> > >
> > > Of course this is slightly different from regular Flink core
> > > configuration but a connector still needs to be configured
> based on
> > > these options.
> > >
> > > However, I think this FLIP violates our code style guidelines
> because
> > >
> > > 'format' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > is an invalid hierarchy. `format` cannot be a string and a
> top-level
> > > object at the same time.
> > >
> > > We have similar problems in our runtime configuration:
> > >
> > > state.backend=
> > > state.backend.incremental=
> > > restart-strategy=
> > > restart-strategy.fixed-delay.delay=
> > > high-availability=
> > > high-availability.cluster-id=
> > >
> > > The code style guide states "Think of the configuration as nested
> > > objects (JSON style)". So such hierarchies cannot be
> represented in a
> > > nested JSON style.
> > >
> > > Therefore, should we advocate instead:
> > >
> > > 'format.kind' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > Timo
> > >
> > > [1]
> > >
> >
> 
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> > >
> >
> >
>


signature.asc
Description: OpenPGP digital signature


[jira] [Created] (FLINK-17468) Provide more detailed metrics why asynchronous part of checkpoint is taking long time

2020-04-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-17468:
--

 Summary: Provide more detailed metrics why asynchronous part of 
checkpoint is taking long time
 Key: FLINK-17468
 URL: https://issues.apache.org/jira/browse/FLINK-17468
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / Metrics, Runtime / 
State Backends
Affects Versions: 1.10.0
Reporter: Piotr Nowojski


As [reported by 
users|https://lists.apache.org/thread.html/r0833452796ca7d1c9d5e35c110089c95cfdadee9d81884a13613a4ce%40%3Cuser.flink.apache.org%3E]
 it's not obvious why asynchronous part of checkpoint is taking so long time.

Maybe we could provide some more detailed metrics/UI/logs about uploading 
files, materializing meta data, or things that are happening during the 
asynchronous checkpoint process?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17467) Implement aligned savepoint in UC mode

2020-04-29 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-17467:
-

 Summary: Implement aligned savepoint in UC mode
 Key: FLINK-17467
 URL: https://issues.apache.org/jira/browse/FLINK-17467
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.11.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17466) toRetractStream doesn't work correctly with Pojo conversion class

2020-04-29 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-17466:
--

 Summary: toRetractStream doesn't work correctly with Pojo 
conversion class
 Key: FLINK-17466
 URL: https://issues.apache.org/jira/browse/FLINK-17466
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Gyula Fora


The toRetractStream(table, Pojo.class) does not map the query columns properly 
to the pojo fields.

This either leads to exceptions due to type incompatibility or simply incorrect 
results.

It can be simple reproduced by the following test code:
{code:java}
@Test
public void testRetract() throws Exception {
 EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .useBlinkPlanner()
 .inStreamingMode()
 .build();

 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 StreamTableEnvironment tableEnv = StreamTableEnvironment
 .create(StreamExecutionEnvironment.getExecutionEnvironment(), settings);

 tableEnv.createTemporaryView("person", env.fromElements(new Person()));
 tableEnv.toRetractStream(tableEnv.sqlQuery("select name, age from person"), 
Person.class).print();
 tableEnv.execute("Test");

}

public static class Person {
 public String name = "bob";
 public int age = 1;
}{code}
Runtime Error:
{noformat}
java.lang.ClassCastException: org.apache.flink.table.dataformat.BinaryString 
cannot be cast to java.lang.Integer{noformat}
Changing the query to "select age,name from person" in this case would resolve 
the problem but it also highlights the possible underlying issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Jark Wu
Looks like the ES NOTICE problem is a long-standing problem, because the
ES6 sql connector NOTICE also misses these dependencies.

Best,
Jark

On Wed, 29 Apr 2020 at 17:26, Robert Metzger  wrote:

> Thanks for taking a look Chesnay. Then let me officially cancel the
> release:
>
> -1 (binding)
>
>
> Another question that I had while checking the release was the
> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> distribution.
> It does not contain a LICENSE and NOTICE file at the root level (which is
> okay [1] for binary releases), but in the "pyflink/" directory. There is
> also a "deps/" directory, which contains a full distribution of Flink,
> without any license files.
> I believe it would be a little bit nicer to have the LICENSE and NOTICE
> file in the root directory (if the python wheels format permits) to make
> sure it is obvious that all binary release contents are covered by these
> files.
>
>
> [1]
> http://www.apache.org/legal/release-policy.html#licensing-documentation
>
>
>
>
> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> wrote:
>
> > Thanks a lot for creating a release candidate for 1.10.1!
> >
> > +1 from my side
> >
> > checked
> > - md5/gpg, ok
> > - source does not contain any binaries, ok
> > - pom points to the same version 1.10.1, ok
> > - README file does not contain anything unexpected, ok
> > - maven clean package -DskipTests, ok
> > - maven clean verify, encounter a test timeout exception, but I think it
> > does not block the RC(have created an issue[1] to track it),
> > - run demos on a stand-alone cluster, ok
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17458
> > Best,
> > Congxian
> >
> >
> > Robert Metzger  于2020年4月29日周三 下午2:54写道:
> >
> > > Thanks a lot for creating a release candidate for 1.10.1!
> > >
> > > I'm not sure, but I think found a potential issue in the release while
> > > checking dependency changes on the ElasticSearch7 connector:
> > >
> > >
> >
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >
> > > In this change, "com.carrotsearch:hppc" has been added to the shaded
> jar
> > (
> > >
> > >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> > > ),
> > > without including proper mention of that dependency in
> "META-INF/NOTICE".
> > >
> > >
> > > My checking notes:
> > >
> > > - checked the diff for dependency changes:
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> > > (w/o
> > > <
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> > >
> > > release commit:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> > > )
> > >   - flink-connector-hive sets the derby version for test-scoped
> > > dependencies:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> > >  - no NOTICE file found, but this module does not forward binaries.
> > >   - kafka 0.10 minor version upgrade:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
> > >   - NOTICE change found
> > >- ES7 changes shading:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >  - problem found
> > >   - Influxdb version change
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
> > >  - NOTICE change found
> > >
> > >
> > >
> > > On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for version
> 1.10.1,
> > as
> > > > follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source release and binary convenience releases
> to
> > > be
> > > > deployed to dist.apache.org [2], which are signed with the key with
> > > > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tag "release-1.10.1-rc1" [5],
> > > > * website pull request listing the new release and adding
> announcement
> > > blog
> > > > post [6].
> > > >
> > > > The vote will be open for at least 72 hours. It is adopted by
> majority
> > > > approval, with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Yu
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> 

[jira] [Created] (FLINK-17465) Update Chinese user documentation for job manager memory model

2020-04-29 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-17465:
---

 Summary: Update Chinese user documentation for job manager memory 
model
 Key: FLINK-17465
 URL: https://issues.apache.org/jira/browse/FLINK-17465
 Project: Flink
  Issue Type: Sub-task
Reporter: Andrey Zagrebin
 Fix For: 1.11.0


This is a follow-up for FLINK-16946.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Flavio Pompermaier
Personally I don't have any preference here.  Compliance wih standard YAML
parser is probably more important

On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:

> From a user's perspective, I prefer the shorter one "format=json", because
> it's more concise and straightforward. The "kind" is redundant for users.
> Is there a real case requires to represent the configuration in JSON style?
> As far as I can see, I don't see such requirement, and everything works
> fine by now.
>
> So I'm in favor of "format=json". But if the community insist to follow
> code style on this, I'm also fine with the longer one.
>
> Btw, I also CC user mailing list to listen more user's feedback. Because I
> think this is relative to usability.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  wrote:
>
> >  > Therefore, should we advocate instead:
> >  >
> >  > 'format.kind' = 'json',
> >  > 'format.fail-on-missing-field' = 'false'
> >
> > Yes. That's pretty much it.
> >
> > This is reasonable important to nail down as with such violations I
> > believe we could not actually switch to a standard YAML parser.
> >
> > On 29/04/2020 16:05, Timo Walther wrote:
> > > Hi everyone,
> > >
> > > discussions around ConfigOption seem to be very popular recently. So I
> > > would also like to get some opinions on a different topic.
> > >
> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > > agreed on the following DDL syntax:
> > >
> > > CREATE TABLE fs_table (
> > >  ...
> > > ) WITH (
> > >  'connector' = 'filesystem',
> > >  'path' = 'file:///path/to/whatever',
> > >  'format' = 'csv',
> > >  'format.allow-comments' = 'true',
> > >  'format.ignore-parse-errors' = 'true'
> > > );
> > >
> > > Of course this is slightly different from regular Flink core
> > > configuration but a connector still needs to be configured based on
> > > these options.
> > >
> > > However, I think this FLIP violates our code style guidelines because
> > >
> > > 'format' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > is an invalid hierarchy. `format` cannot be a string and a top-level
> > > object at the same time.
> > >
> > > We have similar problems in our runtime configuration:
> > >
> > > state.backend=
> > > state.backend.incremental=
> > > restart-strategy=
> > > restart-strategy.fixed-delay.delay=
> > > high-availability=
> > > high-availability.cluster-id=
> > >
> > > The code style guide states "Think of the configuration as nested
> > > objects (JSON style)". So such hierarchies cannot be represented in a
> > > nested JSON style.
> > >
> > > Therefore, should we advocate instead:
> > >
> > > 'format.kind' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > Timo
> > >
> > > [1]
> > >
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> > >
> >
> >


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jark Wu
>From a user's perspective, I prefer the shorter one "format=json", because
it's more concise and straightforward. The "kind" is redundant for users.
Is there a real case requires to represent the configuration in JSON style?
As far as I can see, I don't see such requirement, and everything works
fine by now.

So I'm in favor of "format=json". But if the community insist to follow
code style on this, I'm also fine with the longer one.

Btw, I also CC user mailing list to listen more user's feedback. Because I
think this is relative to usability.

Best,
Jark

On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  wrote:

>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
> > Hi everyone,
> >
> > discussions around ConfigOption seem to be very popular recently. So I
> > would also like to get some opinions on a different topic.
> >
> > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > agreed on the following DDL syntax:
> >
> > CREATE TABLE fs_table (
> >  ...
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = 'file:///path/to/whatever',
> >  'format' = 'csv',
> >  'format.allow-comments' = 'true',
> >  'format.ignore-parse-errors' = 'true'
> > );
> >
> > Of course this is slightly different from regular Flink core
> > configuration but a connector still needs to be configured based on
> > these options.
> >
> > However, I think this FLIP violates our code style guidelines because
> >
> > 'format' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > is an invalid hierarchy. `format` cannot be a string and a top-level
> > object at the same time.
> >
> > We have similar problems in our runtime configuration:
> >
> > state.backend=
> > state.backend.incremental=
> > restart-strategy=
> > restart-strategy.fixed-delay.delay=
> > high-availability=
> > high-availability.cluster-id=
> >
> > The code style guide states "Think of the configuration as nested
> > objects (JSON style)". So such hierarchies cannot be represented in a
> > nested JSON style.
> >
> > Therefore, should we advocate instead:
> >
> > 'format.kind' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> > [1]
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >
>
>


[jira] [Created] (FLINK-17464) Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe cluster to recover service

2020-04-29 Thread John Lonergan (Jira)
John Lonergan created FLINK-17464:
-

 Summary: Stanalone HA Cluster crash with non-recoverable cluster 
state - need to wipe cluster to recover service
 Key: FLINK-17464
 URL: https://issues.apache.org/jira/browse/FLINK-17464
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: John Lonergan


When recovering job graphs after a failover of the JobManager, or after a 
restart of the cluster, the HA Cluster can get into a state where it cannot be 
restarted and the only resoluton we have identified is to destroy the 
Zookkeeper job graph store.

This happens when any job graph that is being recovered throws an exception 
during recovery on the master. 

Whilst we encountered this issues on a sink that extends "InitialiseOnMaster" 
we believe the vulnerability is generic in nature and the unrecolverable 
problems encountered will occur if the application code throws any exception 
for any reason during recovery on the main line. 

These application exceptions propagate up to the JobManager ClusterEntryPoint 
class at which point the JM leader does a system.exit. If there are remaining 
JobManagers then they will also follow leader election and also encounter the 
same sequence of events. Ultimately all JM's exit and then all TM's fail also. 

The entire cluster is destroyed.

Because these events happen during job graph recovery then merely attempt a 
restart of the cluster will fail leaving the only option as destroying the job 
graph state. 

If one is running a shared cluster with many jobs then this is effectively a 
DOS and results in prolonged down time as code or data changes are necessary to 
work around the issue.

--

Of course if the same issue were to occur during job submission using the CLI 
then we do not see the cluster crashing or the cluster being corrupted.

Our feeling is that the job graph recovery process ought to behave in a similar 
fashion to the job submission processes.

If a job submission fails then the job is recorded as failed and there is no 
further impact on the cluster. However, if job recovery fails then the entire 
cluster is taken down, and may as we have seen, become inoperable.

We feel that a failure to restore a single job graph ought merely to result in 
the job being recorded as failed. It should not result in a cluster-wide impact.

We do not understand the logic of the design in this space. However, if the 
existing logic was for the benefit of single job clusters then this is a poor 
result for multi job clusters. In which case we ought to be able to configure a 
cluster for "multi-job mode" so that job graph recovery is "sandboxed"  and 
doesn't take out the entire cluster.


---

It is easy to demonstrate the problem using the built in Flink streaming Word 
Count example.
In order for this to work you configure the job to write a single output file 
and also write this to HDFS not to a local disk. 
You will note that the class FileOutputFormat extends InitializeOnMaster and 
the initializeGlobal() function executes only when the file is on HDFS, not on 
local disk.
When this functon runs it will generate an exception if the output already 
exists.
Therefore to demonstrate the issues do the following:

- configure the job to write a single file to HDFS
- configure the job to to read a large file so that the job takes some time to 
execute and we have time to complete the next few steps bnefore the job 
finishes.
- run the job on a HA cluster with two JM nodes
- wait for the job to start and the output file to be created
- kill the leader JM before the job has finished 
- observe JM failover occuring ... 
- recovery during failover will NOT suceed because the recovery of the Word 
Count job will fail due to the presence of the output file
- observe all JM's and TM's ultimately terminating

Once the cluster has outright failed then try and restart it.

During restart the cluster will detect the presence of job graphs in Zk and 
attempt to restore them. This however, is doomed due to the same vulnerability 
that causes the global outage above.

---

For operability Flink needs a mod such that the job graph recovery process is 
entirely sandboxed and failure of a given job during job graph recovery ought 
to result merely in a failed job and not a failed cluster.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Chesnay Schepler

> Therefore, should we advocate instead:
>
> 'format.kind' = 'json',
> 'format.fail-on-missing-field' = 'false'

Yes. That's pretty much it.

This is reasonable important to nail down as with such violations I 
believe we could not actually switch to a standard YAML parser.


On 29/04/2020 16:05, Timo Walther wrote:

Hi everyone,

discussions around ConfigOption seem to be very popular recently. So I 
would also like to get some opinions on a different topic.


How do we represent hierarchies in ConfigOption? In FLIP-122, we 
agreed on the following DDL syntax:


CREATE TABLE fs_table (
 ...
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///path/to/whatever',
 'format' = 'csv',
 'format.allow-comments' = 'true',
 'format.ignore-parse-errors' = 'true'
);

Of course this is slightly different from regular Flink core 
configuration but a connector still needs to be configured based on 
these options.


However, I think this FLIP violates our code style guidelines because

'format' = 'json',
'format.fail-on-missing-field' = 'false'

is an invalid hierarchy. `format` cannot be a string and a top-level 
object at the same time.


We have similar problems in our runtime configuration:

state.backend=
state.backend.incremental=
restart-strategy=
restart-strategy.fixed-delay.delay=
high-availability=
high-availability.cluster-id=

The code style guide states "Think of the configuration as nested 
objects (JSON style)". So such hierarchies cannot be represented in a 
nested JSON style.


Therefore, should we advocate instead:

'format.kind' = 'json',
'format.fail-on-missing-field' = 'false'

What do you think?

Thanks,
Timo

[1] 
https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes






[DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Timo Walther

Hi everyone,

discussions around ConfigOption seem to be very popular recently. So I 
would also like to get some opinions on a different topic.


How do we represent hierarchies in ConfigOption? In FLIP-122, we agreed 
on the following DDL syntax:


CREATE TABLE fs_table (
 ...
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///path/to/whatever',
 'format' = 'csv',
 'format.allow-comments' = 'true',
 'format.ignore-parse-errors' = 'true'
);

Of course this is slightly different from regular Flink core 
configuration but a connector still needs to be configured based on 
these options.


However, I think this FLIP violates our code style guidelines because

'format' = 'json',
'format.fail-on-missing-field' = 'false'

is an invalid hierarchy. `format` cannot be a string and a top-level 
object at the same time.


We have similar problems in our runtime configuration:

state.backend=
state.backend.incremental=
restart-strategy=
restart-strategy.fixed-delay.delay=
high-availability=
high-availability.cluster-id=

The code style guide states "Think of the configuration as nested 
objects (JSON style)". So such hierarchies cannot be represented in a 
nested JSON style.


Therefore, should we advocate instead:

'format.kind' = 'json',
'format.fail-on-missing-field' = 'false'

What do you think?

Thanks,
Timo

[1] 
https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes


[jira] [Created] (FLINK-17463) BlobCacheCleanupTest.testPermanentBlobCleanup:133->verifyJobCleanup:432 » FileAlreadyExists

2020-04-29 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-17463:
--

 Summary: 
BlobCacheCleanupTest.testPermanentBlobCleanup:133->verifyJobCleanup:432 » 
FileAlreadyExists
 Key: FLINK-17463
 URL: https://issues.apache.org/jira/browse/FLINK-17463
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Tests
Affects Versions: 1.11.0
Reporter: Robert Metzger


CI run: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=317=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=4ed44b66-cdd6-5dcf-5f6a-88b07dda665d

{code}
[ERROR] Tests run: 5, Failures: 0, Errors: 1, Skipped: 1, Time elapsed: 2.73 s 
<<< FAILURE! - in org.apache.flink.runtime.blob.BlobCacheCleanupTest
[ERROR] 
testPermanentBlobCleanup(org.apache.flink.runtime.blob.BlobCacheCleanupTest)  
Time elapsed: 2.028 s  <<< ERROR!
java.nio.file.FileAlreadyExistsException: 
/tmp/junit7984674749832216773/junit1629420330972938723/blobStore-296d1a51-8917-4db1-a920-5d4e17e6fa36/job_3bafac5425979b4fe2fa2c7726f8dd5b
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
at java.nio.file.Files.createDirectory(Files.java:674)
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
at java.nio.file.Files.createDirectories(Files.java:727)
at 
org.apache.flink.runtime.blob.BlobUtils.getStorageLocation(BlobUtils.java:196)
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getStorageLocation(PermanentBlobCache.java:222)
at 
org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFilesExist(BlobServerCleanupTest.java:213)
at 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.verifyJobCleanup(BlobCacheCleanupTest.java:432)
at 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.testPermanentBlobCleanup(BlobCacheCleanupTest.java:133)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Test errors

2020-04-29 Thread Chesnay Schepler

Could you give us the entire error?
What OS are you working on?

On 29/04/2020 15:00, Manish G wrote:

Even without any code change, I see following test errors:

[ERROR] Errors:
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testCloseWithNoData:99
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverAfterMultiplePersistsState:189->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverFromIntermWithoutAdditionalState:181->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverWithEmptyState:165->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverWithState:173->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem

Can these be ignored?

Manish





Test errors

2020-04-29 Thread Manish G
Even without any code change, I see following test errors:

[ERROR] Errors:
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testCloseWithNoData:99
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverAfterMultiplePersistsState:189->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverFromIntermWithoutAdditionalState:181->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverWithEmptyState:165->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem
[ERROR]
LocalFileSystemRecoverableWriterTest>AbstractRecoverableWriterTest.testRecoverWithState:173->AbstractRecoverableWriterTest.testResumeAfterMultiplePersist:247
» FileSystem

Can these be ignored?

Manish


Re: [DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Yang Wang
I am also in favor of the option3. Since the Flink FileSystem has the very
similar implementation via plugin mechanism. It has a map "FS_FACTORIES"
to store the plugin-loaded specific FileSystem(e.g. S3, AzureFS, OSS, etc.).
And provide some common interfaces.


Best,
Yang

Yangze Guo  于2020年4月29日周三 下午3:54写道:

> For your convenience, I modified the Tokenizer in "WordCount"[1] case
> to show how UDF leverages GPU info and how we found that problem.
>
> [1]
> https://github.com/KarmaGYZ/flink/blob/7c5596e43f6d14c65063ab0917f3c0d4bc0211ed/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
>
> Best,
> Yangze Guo
>
> On Wed, Apr 29, 2020 at 3:25 PM Xintong Song 
> wrote:
> >
> > >
> > > Will she ask for some properties and then pass them to another
> component?
> >
> > Yes. Take GPU as an example, the property needed is "GPU index", and the
> > index will be used to tell the OS which GPU should be used for the
> > computing workload.
> >
> >
> > > Where does this component come from?
> >
> > The component could be either the UDF/operator itself, or some AI
> libraries
> > used by the operator. For 1.11, we do not have plan for introducing new
> GPU
> > aware operators in Flink. So all the usages of the GPU resources should
> > come from UDF. Please correct me if I am wrong, @Becket.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Apr 29, 2020 at 3:14 PM Till Rohrmann 
> wrote:
> >
> > > Thanks for bringing this up Yangze and Xintong. I see the problem.
> Help me
> > > to understand how the ExternalResourceInfo is intended to be used by
> the
> > > user. Will she ask for some properties and then pass them to another
> > > component? Where does this component come from?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Apr 29, 2020 at 9:05 AM Xintong Song 
> > > wrote:
> > >
> > > > Thanks for kicking off this discussion, Yangze.
> > > >
> > > > First, let me try to explain a bit more about this problem. Since we
> > > > decided to make the `ExternalResourceDriver` a plugin whose
> > > implementation
> > > > could be provided by user, we think it makes sense to leverage
> Flink’s
> > > > plugin mechanism and load the drivers in separated class loaders to
> avoid
> > > > potential risk of dependency conflicts. However, that means
> > > > `RuntimeContext` and user codes do not naturally have access to
> classes
> > > > defined in the plugin. In the current design,
> > > > `RuntimeContext#getExternalResourceInfos` takes the concrete
> > > > `ExternalResourceInfo` implementation class as an argument. This will
> > > cause
> > > > problem when user codes try to pass in the argument, and when
> > > > `RuntimeContext` tries to do the type check/cast.
> > > >
> > > >
> > > > To my understanding, the root problem is probably that we should not
> > > depend
> > > > on a specific implementation of the `ExternalResourceInfo` interface
> from
> > > > outside the plugin (user codes & runtime context). To that end,
> > > regardless
> > > > the detailed interface design, I'm in favor of the direction of the
> 3rd
> > > > approach. I think it makes sense to add some general
> information/property
> > > > accessing interfaces in `ExternalResourceInfo` (e.g., a key-value
> > > property
> > > > map), so that in most cases users do not need to cast the
> > > > `ExternalResourceInfo` into concrete subclasses.
> > > >
> > > >
> > > > Regarding the detailed interface design, I'm not sure about using
> > > > `Properties`. I think the information contained in a
> > > `ExternalResourceInfo`
> > > > can be considered as a unmodifiable map. So maybe something like the
> > > > following?
> > > >
> > > >
> > > > public interface ExternalResourceInfo {
> > > > > String getProperty(String key);
> > > > > Map getProperties();
> > > > > }
> > > >
> > > >
> > > > WDYT?
> > > >
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Wed, Apr 29, 2020 at 2:40 PM Yangze Guo 
> wrote:
> > > >
> > > > > Hi, there:
> > > > >
> > > > > The "FLIP-108: Add GPU support in Flink"[1] is now working in
> > > > > progress. However, we met a problem with
> > > > > "RuntimeContext#getExternalResourceInfos" if we want to leverage
> the
> > > > > Plugin[2] mechanism in Flink.
> > > > > The interface is:
> > > > > The problem is now:
> > > > > public interface RuntimeContext {
> > > > > /**
> > > > >  * Get the specific external resource information by the
> > > > resourceName.
> > > > >  */
> > > > >  Set
> > > > > getExternalResourceInfos(String resourceName, Class
> > > > > externalResourceType);
> > > > > }
> > > > > The problem is that the mainClassLoader does not recognize the
> > > > > subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
> > > > > located in ExternalResourceDriver jar and has been isolated from
> > > > > mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
> > > > > thrown out.

[jira] [Created] (FLINK-17462) Support CSV serialization and deseriazation schema for RowData type

2020-04-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-17462:
---

 Summary: Support CSV serialization and deseriazation schema for 
RowData type
 Key: FLINK-17462
 URL: https://issues.apache.org/jira/browse/FLINK-17462
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Add support {{CsvRowDataDeserializationSchema}} and 
{{CsvRowDataSerializationSchema}} for the new data structure {{RowData}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17461) Support JSON serialization and deseriazation schema for RowData type

2020-04-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-17461:
---

 Summary: Support JSON serialization and deseriazation schema for 
RowData type
 Key: FLINK-17461
 URL: https://issues.apache.org/jira/browse/FLINK-17461
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Add support {{JsonRowDataDeserializationSchema}} and 
{{JsonRowDataSerializationSchema}} for the new data structure {{RowData}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17460) Create sql-jars for parquet and orc

2020-04-29 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-17460:


 Summary: Create sql-jars for parquet and orc
 Key: FLINK-17460
 URL: https://issues.apache.org/jira/browse/FLINK-17460
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jingsong Lee
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills

2020-04-29 Thread ranqiqiang (Jira)
ranqiqiang created FLINK-17459:
--

 Summary: JDBCAppendTableSink not  support  flush  by 
flushIntervalMills
 Key: FLINK-17459
 URL: https://issues.apache.org/jira/browse/FLINK-17459
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.10.0
Reporter: ranqiqiang


{{JDBCAppendTableSink just support append by 
"JDBCAppendTableSinkBuilder#batchSize" ,}}{{not support like  
"JDBCUpsertTableSink#}}flushIntervalMills{{"}}

 

{{If batchSize=5000 ,  my data rows=5000*N+1 ,then last one record could not be 
get !!}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-29 Thread Aljoscha Krettek

+1 I like the general idea of printing the results as a table.

On the specifics I don't know enough but Fabians suggestions seems to 
make sense to me.


Aljoscha

On 29.04.20 10:56, Fabian Hueske wrote:

Hi Godfrey,

Thanks for starting this discussion!

In my mind, WATERMARK is a property (or constraint) of a field, just like
PRIMARY KEY.
Take this example from MySQL:

mysql> CREATE TABLE people (id INT NOT NULL, name VARCHAR(128) NOT NULL,
age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.06 sec)

mysql> describe people;
+---+--+--+-+-+---+
| Field | Type | Null | Key | Default | Extra |
+---+--+--+-+-+---+
| id| int  | NO   | PRI | NULL|   |
| name  | varchar(128) | NO   | | NULL|   |
| age   | int  | YES  | | NULL|   |
+---+--+--+-+-+---+
3 rows in set (0.01 sec)

Here, PRIMARY KEY is marked in the Key column of the id field.
We could do the same for watermarks by adding a Watermark column.

Best, Fabian


Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he :


Hi everyone,

I would like to bring up a discussion about the result type of describe
statement,
which is introduced in FLIP-84[1].
In previous version, we define the result type of `describe` statement is a
single column as following

Statement

Result Schema

Result Value

Result Kind

Examples

DESCRIBE xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

describe the detail of an object

(single row)

SUCCESS_WITH_CONTENT

DESCRIBE table_name

for "describe table_name", the result value is the `toString` value of
`TableSchema`, which is an unstructured data.
It's hard to for user to use this info.

for example:

TableSchema schema = TableSchema.builder()
.field("f0", DataTypes.BIGINT())
.field("f1", DataTypes.ROW(
   DataTypes.FIELD("q1", DataTypes.STRING()),
   DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3
.field("f2", DataTypes.STRING())
.field("f3", DataTypes.BIGINT(), "f0 + 1")
.watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
.build();

its `toString` value is:
root
  |-- f0: BIGINT
  |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
  |-- f2: STRING
  |-- f3: BIGINT AS f0 + 1
  |-- WATERMARK FOR f1.q2 AS now()

For hive, MySQL, etc., the describe result is table form including field
names and field types.
which is more familiar with users.
TableSchema[2] has watermark expression and compute column, we should also
put them into the table:
for compute column, it's a column level, we add a new column named `expr`.
  for watermark expression, it's a table level, we add a special row named
`WATERMARK` to represent it.

The result will look like about above example:

name

type

expr

f0

BIGINT

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

(NULL)

f2

STRING

NULL

f3

BIGINT

f0 + 1

WATERMARK

(NULL)

f1.q2 AS now()

now there is a pr FLINK-17112 [3] to implement DESCRIBE statement.

What do you think about this update?
Any feedback are welcome~

Best,
Godfrey

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]

https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
[3] https://github.com/apache/flink/pull/11892


godfrey he  于2020年4月6日周一 下午10:38写道:


Hi Timo,

Sorry for the late reply, and thanks for your correction.
I missed DQL for job submission scenario.
I'll fix the document right away.

Best,
Godfrey

Timo Walther  于2020年4月3日周五 下午9:53写道:


Hi Godfrey,

I'm sorry to jump in again but I still need to clarify some things
around TableResult.

The FLIP says:
"For DML, this method returns TableResult until the job is submitted.
For other statements, TableResult is returned until the execution is
finished."

I thought we agreed on making every execution async? This also means
returning a TableResult for DQLs even though the execution is not done
yet. People need access to the JobClient also for batch jobs in order to
cancel long lasting queries. If people want to wait for the completion
they can hook into JobClient or collect().

Can we rephrase this part to:

The FLIP says:
"For DML and DQL, this method returns TableResult once the job has been
submitted. For DDL and DCL statements, TableResult is returned once the
operation has finished."

Regards,
Timo


On 02.04.20 05:27, godfrey he wrote:

Hi Aljoscha, Dawid, Timo,

Thanks so much for the detailed explanation.
Agree with you that the multiline story is not completed now, and we

can

keep discussion.
I will add current discussions and conclusions to the FLIP.

Best,
Godfrey



Timo Walther  于2020年4月1日周三 下午11:27写道:


Hi Godfrey,

first of all, I agree with Dawid. The multiline story is not

completed

by this FLIP. It just verifies the big picture.

1. "control the execution logic through the proposed method if they

know

what 

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Robert Metzger
Thanks for taking a look Chesnay. Then let me officially cancel the release:

-1 (binding)


Another question that I had while checking the release was the
"apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
distribution.
It does not contain a LICENSE and NOTICE file at the root level (which is
okay [1] for binary releases), but in the "pyflink/" directory. There is
also a "deps/" directory, which contains a full distribution of Flink,
without any license files.
I believe it would be a little bit nicer to have the LICENSE and NOTICE
file in the root directory (if the python wheels format permits) to make
sure it is obvious that all binary release contents are covered by these
files.


[1] http://www.apache.org/legal/release-policy.html#licensing-documentation




On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
wrote:

> Thanks a lot for creating a release candidate for 1.10.1!
>
> +1 from my side
>
> checked
> - md5/gpg, ok
> - source does not contain any binaries, ok
> - pom points to the same version 1.10.1, ok
> - README file does not contain anything unexpected, ok
> - maven clean package -DskipTests, ok
> - maven clean verify, encounter a test timeout exception, but I think it
> does not block the RC(have created an issue[1] to track it),
> - run demos on a stand-alone cluster, ok
>
> [1] https://issues.apache.org/jira/browse/FLINK-17458
> Best,
> Congxian
>
>
> Robert Metzger  于2020年4月29日周三 下午2:54写道:
>
> > Thanks a lot for creating a release candidate for 1.10.1!
> >
> > I'm not sure, but I think found a potential issue in the release while
> > checking dependency changes on the ElasticSearch7 connector:
> >
> >
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> >
> > In this change, "com.carrotsearch:hppc" has been added to the shaded jar
> (
> >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> > ),
> > without including proper mention of that dependency in "META-INF/NOTICE".
> >
> >
> > My checking notes:
> >
> > - checked the diff for dependency changes:
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> > (w/o
> > <
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> >
> > release commit:
> >
> >
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> > )
> >   - flink-connector-hive sets the derby version for test-scoped
> > dependencies:
> >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> >  - no NOTICE file found, but this module does not forward binaries.
> >   - kafka 0.10 minor version upgrade:
> >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
> >   - NOTICE change found
> >- ES7 changes shading:
> >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
> >  - problem found
> >   - Influxdb version change
> >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
> >  - NOTICE change found
> >
> >
> >
> > On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
> >
> > > Hi everyone,
> > >
> > > Please review and vote on the release candidate #1 for version 1.10.1,
> as
> > > follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release and binary convenience releases to
> > be
> > > deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag "release-1.10.1-rc1" [5],
> > > * website pull request listing the new release and adding announcement
> > blog
> > > post [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Yu
> > >
> > > [1]
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> > > <
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> > > >
> > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc1/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/
> > > [5]
> > >
> > >
> >
> https://github.com/apache/flink/commit/84b74cc0e21981bf6feceb74b48d7a9d3e215dc5
> > 

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Congxian Qiu
Thanks a lot for creating a release candidate for 1.10.1!

+1 from my side

checked
- md5/gpg, ok
- source does not contain any binaries, ok
- pom points to the same version 1.10.1, ok
- README file does not contain anything unexpected, ok
- maven clean package -DskipTests, ok
- maven clean verify, encounter a test timeout exception, but I think it
does not block the RC(have created an issue[1] to track it),
- run demos on a stand-alone cluster, ok

[1] https://issues.apache.org/jira/browse/FLINK-17458
Best,
Congxian


Robert Metzger  于2020年4月29日周三 下午2:54写道:

> Thanks a lot for creating a release candidate for 1.10.1!
>
> I'm not sure, but I think found a potential issue in the release while
> checking dependency changes on the ElasticSearch7 connector:
>
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
>
> In this change, "com.carrotsearch:hppc" has been added to the shaded jar (
>
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> ),
> without including proper mention of that dependency in "META-INF/NOTICE".
>
>
> My checking notes:
>
> - checked the diff for dependency changes:
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> (w/o
> 
> release commit:
>
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> )
>   - flink-connector-hive sets the derby version for test-scoped
> dependencies:
>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
>  - no NOTICE file found, but this module does not forward binaries.
>   - kafka 0.10 minor version upgrade:
>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
>   - NOTICE change found
>- ES7 changes shading:
>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
>  - problem found
>   - Influxdb version change
>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
>  - NOTICE change found
>
>
>
> On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
>
> > Hi everyone,
> >
> > Please review and vote on the release candidate #1 for version 1.10.1, as
> > follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [2], which are signed with the key with
> > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag "release-1.10.1-rc1" [5],
> > * website pull request listing the new release and adding announcement
> blog
> > post [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Yu
> >
> > [1]
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> > <
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> > >
> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc1/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1362/
> > [5]
> >
> >
> https://github.com/apache/flink/commit/84b74cc0e21981bf6feceb74b48d7a9d3e215dc5
> > [6] https://github.com/apache/flink-web/pull/330
> >
>


Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Chesnay Schepler

Yes, this is a reason to cancel the RC.

Looking at the ES commit, there may also be other dependencies missing, 
like mustache and elasticsearch-geo.



On 29/04/2020 08:54, Robert Metzger wrote:

Thanks a lot for creating a release candidate for 1.10.1!

I'm not sure, but I think found a potential issue in the release while
checking dependency changes on the ElasticSearch7 connector:
https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38

In this change, "com.carrotsearch:hppc" has been added to the shaded jar (
https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar),
without including proper mention of that dependency in "META-INF/NOTICE".


My checking notes:

- checked the diff for dependency changes:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
(w/o
release commit:
https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
)
   - flink-connector-hive sets the derby version for test-scoped
dependencies:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
  - no NOTICE file found, but this module does not forward binaries.
   - kafka 0.10 minor version upgrade:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
   - NOTICE change found
- ES7 changes shading:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
  - problem found
   - Influxdb version change
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
  - NOTICE change found



On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:


Hi everyone,

Please review and vote on the release candidate #1 for version 1.10.1, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.10.1-rc1" [5],
* website pull request listing the new release and adding announcement blog
post [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Yu

[1]

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
<
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4]
https://repository.apache.org/content/repositories/orgapacheflink-1362/
[5]

https://github.com/apache/flink/commit/84b74cc0e21981bf6feceb74b48d7a9d3e215dc5
[6] https://github.com/apache/flink-web/pull/330





Re: [DISCUSS] Intermediary releases of the flink-docker images

2020-04-29 Thread Chesnay Schepler
We will not advertise snapshot artifacts to users. This is simply not 
allowed by ASF rules.


On 29/04/2020 04:08, Yang Wang wrote:

Thanks for starting this discussion.

In general, i am also in favor of making docker image release could be
partly decoupled from Flink release. However, i think it should only happen
when we want to make some changes that are independent from Flink
majar release(e.g. JDK11, change base image, install more tools, upgrade the
docker entrypoint, etc.). If the Flink release branch codes have changed, we
should not build them in a new docker image. Instead, if we could provide
the daily updating snapshot for docker image, it will be very helpful for
the users
who want to have a taste of new features or benefit from urgent hotfix.


Best,
Yang

Ismaël Mejía  于2020年4月28日周二 下午11:07写道:


To me this means that the docker images should be released at the same

time the other artifacts are released. This also includes shapshot
releases. So the build of the docker images should be an integral part of
the build.

This is already the case since the last release, what this thread is
about are the intermediary releases that correspond to an upstream
release. Let’s call those the 1.10.0-dockerN. Let’s not forget that
our released docker images are not fully immutable artifacts because
upstream docker official images maintainers may (and regularly do)
update them for example to fix security issues on the parent images or
to catch new versions of Java or the base OS in our case openjdk and
debian.

These releases may be required to fix issues or include new features
that are independent of Flink releases (we can also ignore and be
tight to the Flink release cycle to simplify the process), but
remember that the upstream docker images maintainers may ‘force us’ to
do updates (this rarely happens but when it happens is something we
cannot ignore).


I do wonder though whether the Dockerfiles really are binary releases.

The Dockerfiles are not binary releases but are the base of the docker
image releases let’s not forget that since we want the images to be
official docker-images release
https://github.com/docker-library/official-images we have to align
with their processes too.

I suppose the wisest is to align with the ASF vote/release process and
that this produces a new PR of and image in the official-images repo
(which would be the dockee equivalent of publishing to central). There
are probably some minor issues that we might find in the way, so far
the only one I can think of is how would we identify intermediary
docker releases but apart of this I am +1 on ‘formal’ release process
even if I found it too heavy we are an ASF project so we should align
with its rules.

Also this goes inline with the existing proposal by Chesnay on “Moving
docker development into versioned branches”

https://lists.apache.org/thread.html/rbb2569d24b35a40adb01b7abfe62fc1ada0408539403ef826491d0ee%40%3Cdev.flink.apache.org%3E

Any other opinions?

If we reach consensus maybe we should do an initial extra release and
VOTE to test the process. I would like to volunteer to do this and
document the process (we could for example do this to include the Java
11 version), but well let’s wait for other opinions first.

Ismaël

ps.  I should probably be accompanied by a committer at some steps
because I could lack some permissions to do the release.

On Tue, Apr 28, 2020 at 1:17 PM Chesnay Schepler 
wrote:

I agree with Niels that if a Flink release is made it should be
accompanied by the Dockerfiles for that version, and release them at

once.

This makes sense for testing purposes alone, and I view Docker as just
another distribution channel like Maven central.
The reverse isn't necessarily true though, as in we can release an
updated version of the Dockerfiles without an accompanying Flink release.
This is just for convenience so we can fix issues on the Docker side

faster.

This is a fair compromise imo.

I do wonder though whether the Dockerfiles really are binary releases.
Aren't they more of an instruction set to assemble one?
Given that we actively point other parties to files in our repo
(which I suppose by definition is the source), then it seems
questionable to categorize this as a binary release.

On 28/04/2020 09:30, Aljoscha Krettek wrote:

Hi Niels,

I think Robert was referring to the fact that Apache considers only
the source release to be "the release", everything else is called
convenience release.

Best,
Aljoscha

On 27.04.20 19:43, Niels Basjes wrote:

Hi,

In my opinion the docker images are essentially simply differently
packed
binary releases.

This becomes more true when in the future deploying a Flink
application to
kubernetes simply pulls the correct binary from a docker hub. Because

of

these kinds of use cases I disagree with Robert that these are just
convenience releases.

To me this means that the docker images should be released at the
same time
the other artifacts are released. This also 

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-29 Thread Fabian Hueske
Hi Godfrey,

Thanks for starting this discussion!

In my mind, WATERMARK is a property (or constraint) of a field, just like
PRIMARY KEY.
Take this example from MySQL:

mysql> CREATE TABLE people (id INT NOT NULL, name VARCHAR(128) NOT NULL,
age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.06 sec)

mysql> describe people;
+---+--+--+-+-+---+
| Field | Type | Null | Key | Default | Extra |
+---+--+--+-+-+---+
| id| int  | NO   | PRI | NULL|   |
| name  | varchar(128) | NO   | | NULL|   |
| age   | int  | YES  | | NULL|   |
+---+--+--+-+-+---+
3 rows in set (0.01 sec)

Here, PRIMARY KEY is marked in the Key column of the id field.
We could do the same for watermarks by adding a Watermark column.

Best, Fabian


Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he :

> Hi everyone,
>
> I would like to bring up a discussion about the result type of describe
> statement,
> which is introduced in FLIP-84[1].
> In previous version, we define the result type of `describe` statement is a
> single column as following
>
> Statement
>
> Result Schema
>
> Result Value
>
> Result Kind
>
> Examples
>
> DESCRIBE xx
>
> field name: result
>
> field type: VARCHAR(n)
>
> (n is the max length of values)
>
> describe the detail of an object
>
> (single row)
>
> SUCCESS_WITH_CONTENT
>
> DESCRIBE table_name
>
> for "describe table_name", the result value is the `toString` value of
> `TableSchema`, which is an unstructured data.
> It's hard to for user to use this info.
>
> for example:
>
> TableSchema schema = TableSchema.builder()
>.field("f0", DataTypes.BIGINT())
>.field("f1", DataTypes.ROW(
>   DataTypes.FIELD("q1", DataTypes.STRING()),
>   DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3
>.field("f2", DataTypes.STRING())
>.field("f3", DataTypes.BIGINT(), "f0 + 1")
>.watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
>.build();
>
> its `toString` value is:
> root
>  |-- f0: BIGINT
>  |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
>  |-- f2: STRING
>  |-- f3: BIGINT AS f0 + 1
>  |-- WATERMARK FOR f1.q2 AS now()
>
> For hive, MySQL, etc., the describe result is table form including field
> names and field types.
> which is more familiar with users.
> TableSchema[2] has watermark expression and compute column, we should also
> put them into the table:
> for compute column, it's a column level, we add a new column named `expr`.
>  for watermark expression, it's a table level, we add a special row named
> `WATERMARK` to represent it.
>
> The result will look like about above example:
>
> name
>
> type
>
> expr
>
> f0
>
> BIGINT
>
> (NULL)
>
> f1
>
> ROW<`q1` STRING, `q2` TIMESTAMP(3)>
>
> (NULL)
>
> f2
>
> STRING
>
> NULL
>
> f3
>
> BIGINT
>
> f0 + 1
>
> WATERMARK
>
> (NULL)
>
> f1.q2 AS now()
>
> now there is a pr FLINK-17112 [3] to implement DESCRIBE statement.
>
> What do you think about this update?
> Any feedback are welcome~
>
> Best,
> Godfrey
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> [2]
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
> [3] https://github.com/apache/flink/pull/11892
>
>
> godfrey he  于2020年4月6日周一 下午10:38写道:
>
> > Hi Timo,
> >
> > Sorry for the late reply, and thanks for your correction.
> > I missed DQL for job submission scenario.
> > I'll fix the document right away.
> >
> > Best,
> > Godfrey
> >
> > Timo Walther  于2020年4月3日周五 下午9:53写道:
> >
> >> Hi Godfrey,
> >>
> >> I'm sorry to jump in again but I still need to clarify some things
> >> around TableResult.
> >>
> >> The FLIP says:
> >> "For DML, this method returns TableResult until the job is submitted.
> >> For other statements, TableResult is returned until the execution is
> >> finished."
> >>
> >> I thought we agreed on making every execution async? This also means
> >> returning a TableResult for DQLs even though the execution is not done
> >> yet. People need access to the JobClient also for batch jobs in order to
> >> cancel long lasting queries. If people want to wait for the completion
> >> they can hook into JobClient or collect().
> >>
> >> Can we rephrase this part to:
> >>
> >> The FLIP says:
> >> "For DML and DQL, this method returns TableResult once the job has been
> >> submitted. For DDL and DCL statements, TableResult is returned once the
> >> operation has finished."
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 02.04.20 05:27, godfrey he wrote:
> >> > Hi Aljoscha, Dawid, Timo,
> >> >
> >> > Thanks so much for the detailed explanation.
> >> > Agree with you that the multiline story is not completed now, and we
> can
> >> > keep discussion.
> >> > I will add current discussions and conclusions to the FLIP.
> >> >
> >> > Best,
> >> > Godfrey
> >> >
> >> >
> >> >
> >> > Timo Walther  

Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-04-29 Thread 刘大龙



> -原始邮件-
> 发件人: "Jark Wu" 
> 发送时间: 2020-04-29 14:09:44 (星期三)
> 收件人: dev , "Yu Li" , myas...@live.com
> 抄送: azagre...@apache.org
> 主题: Re: The use of state ttl incremental cleanup strategy in sql 
> deduplication resulting in significant performance degradation
> 
> Hi lsyldliu,
> 
> Thanks for investigating this.
> 
> First of all, if you are using mini-batch deduplication, it doesn't support
> state ttl in 1.9. That's why the tps looks the same with 1.11 disable state
> ttl.
> We just introduce state ttl for mini-batch deduplication recently.
> 
> Regarding to the performance regression, it looks very surprise to me. The
> performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
> I don't have much experience of the underlying of StateTtlConfig. So I loop
> in @Yu Li  @YunTang in CC who may have more insights on
> this.
> 
> For more information, we use the following StateTtlConfig [1] in blink
> planner:
> 
> StateTtlConfig
>   .newBuilder(Time.milliseconds(retentionTime))
>   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>   .build();
> 
> 
> Best,
> Jark
> 
> 
> [1]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27
> 
> 
> 
> 
> 
> On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:
> 
> > Hi, all!
> >
> > At flink master branch, we have supported state ttl  for sql mini-batch
> > deduplication using incremental cleanup strategy on heap backend, refer to
> > FLINK-16581. Because I want to test the performance of this feature, so I
> > compile master branch code and deploy the jar to production
> > environment,then run three types of tests, respectively:
> >
> >
> >
> >
> > flink 1.9.0 release version enable state ttl
> > flink 1.11-snapshot version disable state ttl
> > flink 1.11-snapshot version enable state ttl
> >
> >
> >
> >
> > The test query sql as follows:
> >
> > select order_date,
> > sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> > goods_carriage_amt) as saleP,
> > sum(amount) as saleN,
> > count(distinct parent_sn) as orderN,
> > count(distinct user_id) as cusN
> >from(
> > select order_date, user_id,
> > order_type, order_status, terminal, last_update_time,
> > goods_all_fav_amt,
> > goods_carriage_amt, virtual_money_amt, price, amount,
> > order_quality, quality_goods_cnt, acture_goods_amt
> > from (select *, row_number() over(partition by order_id,
> > order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
> > where rownum=1
> > and (order_type in (1,2,3,4,5) or order_status = 70)
> > and terminal = 'shop' and price > 0)
> > group by order_date
> >
> >
> > At runtime, this query will generate two operators which include
> > Deduplication and GroupAgg. In the test, the configuration is same,
> > parallelism is 20, set kafka consumer from the earliest, and disable
> > mini-batch function, The test results as follows:
> >
> > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> > records, average tps at 5200/s, Flink UI picture link back pressure,
> > checkpoint
> > flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
> > receive 883w records, average tps at 5200/s, Flink UI picture link back
> > pressure, checkpoint
> > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> > flink only receive 168w records because of deduplication operator serious
> > back pressure, average tps at 270/s, moreover, checkpoint always fail
> > because of deduplication operator serious back pressure, Flink UI picture
> > link back pressure, checkpoint
> >
> > Deduplication state clean up implement in flink 1.9.0 use timer, but
> > 1.11-snapshot version use StateTtlConfig, this is the main difference.
> > Comparing the three tests comprehensively, we can see that if disable state
> > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state
> > ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> > nearly 20 times, so I think incremental cleanup strategy cause this
> > problem, what do you think about it? @azagrebin, @jark.
> >
> > Thanks.
> >
> > lsyldliu
> >
> > Zhejiang University, College of Control Science and engineer, CSC


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281
Hi Jark,
I use non-minibtach deduplication and group agg for the tests, non-minibatch 
deduplicaiton state ttl implementation has been refactored use StateTtlConfig 
replace timer in current 1.11 master branch that PR is my work, I also surprise 
to the 19x performance down.

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-29 Thread godfrey he
Hi everyone,

I would like to bring up a discussion about the result type of describe
statement,
which is introduced in FLIP-84[1].
In previous version, we define the result type of `describe` statement is a
single column as following

Statement

Result Schema

Result Value

Result Kind

Examples

DESCRIBE xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

describe the detail of an object

(single row)

SUCCESS_WITH_CONTENT

DESCRIBE table_name

for "describe table_name", the result value is the `toString` value of
`TableSchema`, which is an unstructured data.
It's hard to for user to use this info.

for example:

TableSchema schema = TableSchema.builder()
   .field("f0", DataTypes.BIGINT())
   .field("f1", DataTypes.ROW(
  DataTypes.FIELD("q1", DataTypes.STRING()),
  DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3
   .field("f2", DataTypes.STRING())
   .field("f3", DataTypes.BIGINT(), "f0 + 1")
   .watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
   .build();

its `toString` value is:
root
 |-- f0: BIGINT
 |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
 |-- f2: STRING
 |-- f3: BIGINT AS f0 + 1
 |-- WATERMARK FOR f1.q2 AS now()

For hive, MySQL, etc., the describe result is table form including field
names and field types.
which is more familiar with users.
TableSchema[2] has watermark expression and compute column, we should also
put them into the table:
for compute column, it's a column level, we add a new column named `expr`.
 for watermark expression, it's a table level, we add a special row named
`WATERMARK` to represent it.

The result will look like about above example:

name

type

expr

f0

BIGINT

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

(NULL)

f2

STRING

NULL

f3

BIGINT

f0 + 1

WATERMARK

(NULL)

f1.q2 AS now()

now there is a pr FLINK-17112 [3] to implement DESCRIBE statement.

What do you think about this update?
Any feedback are welcome~

Best,
Godfrey

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
[3] https://github.com/apache/flink/pull/11892


godfrey he  于2020年4月6日周一 下午10:38写道:

> Hi Timo,
>
> Sorry for the late reply, and thanks for your correction.
> I missed DQL for job submission scenario.
> I'll fix the document right away.
>
> Best,
> Godfrey
>
> Timo Walther  于2020年4月3日周五 下午9:53写道:
>
>> Hi Godfrey,
>>
>> I'm sorry to jump in again but I still need to clarify some things
>> around TableResult.
>>
>> The FLIP says:
>> "For DML, this method returns TableResult until the job is submitted.
>> For other statements, TableResult is returned until the execution is
>> finished."
>>
>> I thought we agreed on making every execution async? This also means
>> returning a TableResult for DQLs even though the execution is not done
>> yet. People need access to the JobClient also for batch jobs in order to
>> cancel long lasting queries. If people want to wait for the completion
>> they can hook into JobClient or collect().
>>
>> Can we rephrase this part to:
>>
>> The FLIP says:
>> "For DML and DQL, this method returns TableResult once the job has been
>> submitted. For DDL and DCL statements, TableResult is returned once the
>> operation has finished."
>>
>> Regards,
>> Timo
>>
>>
>> On 02.04.20 05:27, godfrey he wrote:
>> > Hi Aljoscha, Dawid, Timo,
>> >
>> > Thanks so much for the detailed explanation.
>> > Agree with you that the multiline story is not completed now, and we can
>> > keep discussion.
>> > I will add current discussions and conclusions to the FLIP.
>> >
>> > Best,
>> > Godfrey
>> >
>> >
>> >
>> > Timo Walther  于2020年4月1日周三 下午11:27写道:
>> >
>> >> Hi Godfrey,
>> >>
>> >> first of all, I agree with Dawid. The multiline story is not completed
>> >> by this FLIP. It just verifies the big picture.
>> >>
>> >> 1. "control the execution logic through the proposed method if they
>> know
>> >> what the statements are"
>> >>
>> >> This is a good point that also Fabian raised in the linked google doc.
>> I
>> >> could also imagine to return a more complicated POJO when calling
>> >> `executeMultiSql()`.
>> >>
>> >> The POJO would include some `getSqlProperties()` such that a platform
>> >> gets insights into the query before executing. We could also trigger
>> the
>> >> execution more explicitly instead of hiding it behind an iterator.
>> >>
>> >> 2. "there are some special commands introduced in SQL client"
>> >>
>> >> For platforms and SQL Client specific commands, we could offer a hook
>> to
>> >> the parser or a fallback parser in case the regular table environment
>> >> parser cannot deal with the statement.
>> >>
>> >> However, all of that is future work and can be discussed in a separate
>> >> FLIP.
>> >>
>> >> 3. +1 for the `Iterator` instead of `Iterable`.
>> >>
>> >> 4. "we should convert the checked exception to unchecked exception"
>> >>
>> >> Yes, I 

[jira] [Created] (FLINK-17458) TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers

2020-04-29 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-17458:
-

 Summary: 
TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers
 Key: FLINK-17458
 URL: https://issues.apache.org/jira/browse/FLINK-17458
 Project: Flink
  Issue Type: Test
  Components: Tests
Affects Versions: 1.10.0
Reporter: Congxian Qiu(klion26)


When verifying the RC of release-1.10.1, found that 
`TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers` will fail 
because of Timeout sometime. 

I run this test locally in IDEA, found the following exception(locally in only 
encounter 2 in 1000 times)
{code:java}
java.lang.InterruptedExceptionjava.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
 at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
akka.event.LoggingBus$class.akka$event$LoggingBus$$addLogger(Logging.scala:182) 
at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:117) 
at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:116) 
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at 
scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) 
at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:116) at 
akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:113) at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682) at 
akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:113) at 
akka.event.EventStream.startDefaultLoggers(EventStream.scala:22) at 
akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:662) at 
akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874) at 
akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870) at 
akka.actor.ActorSystemImpl._start(ActorSystem.scala:870) at 
akka.actor.ActorSystemImpl.start(ActorSystem.scala:891) at 
akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96) at 
akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70) at 
akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55) at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125) 
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:113) 
at 
org.apache.flink.runtime.akka.AkkaUtils$.createLocalActorSystem(AkkaUtils.scala:68)
 at 
org.apache.flink.runtime.akka.AkkaUtils.createLocalActorSystem(AkkaUtils.scala) 
at 
org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:74)
 at 
org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:67)
 at 
org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment$Builder.build(TaskSubmissionTestEnvironment.java:349)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest.testFailingScheduleOrUpdateConsumers(TaskExecutorSubmissionTest.java:544)
 at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748)
org.junit.runners.model.TestTimedOutException: test timed out after 1 
milliseconds
 at sun.misc.Unsafe.park(Native Method) at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at 

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-29 Thread godfrey he
Hi everyone,

I would like to bring up a discussion about the result type of describe
statement.
In previous version, we define the result type of describe statement is a
single column as following

Statement

Result Schema

Result Value

Result Kind

Examples

DESCRIBE xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

describe the detail of an object

(single row)

SUCCESS_WITH_CONTENT

DESCRIBE table_name

for "describe table_name", the result value is the toString value of
TableSchema, which is an unstructured data,
It's hard to for user to use this info.

for example:

TableSchema schema = TableSchema.builder()
   .field("f0", DataTypes.BIGINT())
   .field("f1", DataTypes.ROW(
  DataTypes.FIELD("q1", DataTypes.STRING()),
  DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3
   .field("f2", DataTypes.STRING())
   .field("f3", DataTypes.BIGINT(), "f0 + 1")
   .watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
   .build();

its `toString` value is:
root
 |-- f0: BIGINT
 |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
 |-- f2: STRING
 |-- f3: BIGINT AS f0 + 1
 |-- WATERMARK FOR f1.q2 AS now()

For hive, MySQL, their describe results are table form. which is more
familiar with users.
TableSchema has watermark expression and compute column. we should put them
into the table.
for compute column, it's a column level, we add a new column named `expr`.
 for watermark expression, it's not a column level, we add a special row
named `WATERMARK` to represent it.
The result will look like:

+-+-+-+
| name| type|   expr  |
+-+-+-+
|  f0 |INT  |  (NULL) |
|  f1  |INT  |   a + 1 |
|  f2  |TIMESTAMP(3) |  (NULL) |
| WATERMARK   |  (NULL) | c AS now()  |
+-+-+-+







FLINK-17112




godfrey he  于2020年4月6日周一 下午10:38写道:

> Hi Timo,
>
> Sorry for the late reply, and thanks for your correction.
> I missed DQL for job submission scenario.
> I'll fix the document right away.
>
> Best,
> Godfrey
>
> Timo Walther  于2020年4月3日周五 下午9:53写道:
>
>> Hi Godfrey,
>>
>> I'm sorry to jump in again but I still need to clarify some things
>> around TableResult.
>>
>> The FLIP says:
>> "For DML, this method returns TableResult until the job is submitted.
>> For other statements, TableResult is returned until the execution is
>> finished."
>>
>> I thought we agreed on making every execution async? This also means
>> returning a TableResult for DQLs even though the execution is not done
>> yet. People need access to the JobClient also for batch jobs in order to
>> cancel long lasting queries. If people want to wait for the completion
>> they can hook into JobClient or collect().
>>
>> Can we rephrase this part to:
>>
>> The FLIP says:
>> "For DML and DQL, this method returns TableResult once the job has been
>> submitted. For DDL and DCL statements, TableResult is returned once the
>> operation has finished."
>>
>> Regards,
>> Timo
>>
>>
>> On 02.04.20 05:27, godfrey he wrote:
>> > Hi Aljoscha, Dawid, Timo,
>> >
>> > Thanks so much for the detailed explanation.
>> > Agree with you that the multiline story is not completed now, and we can
>> > keep discussion.
>> > I will add current discussions and conclusions to the FLIP.
>> >
>> > Best,
>> > Godfrey
>> >
>> >
>> >
>> > Timo Walther  于2020年4月1日周三 下午11:27写道:
>> >
>> >> Hi Godfrey,
>> >>
>> >> first of all, I agree with Dawid. The multiline story is not completed
>> >> by this FLIP. It just verifies the big picture.
>> >>
>> >> 1. "control the execution logic through the proposed method if they
>> know
>> >> what the statements are"
>> >>
>> >> This is a good point that also Fabian raised in the linked google doc.
>> I
>> >> could also imagine to return a more complicated POJO when calling
>> >> `executeMultiSql()`.
>> >>
>> >> The POJO would include some `getSqlProperties()` such that a platform
>> >> gets insights into the query before executing. We could also trigger
>> the
>> >> execution more explicitly instead of hiding it behind an iterator.
>> >>
>> >> 2. "there are some special commands introduced in SQL client"
>> >>
>> >> For platforms and SQL Client specific commands, we could offer a hook
>> to
>> >> the parser or a fallback parser in case the regular table environment
>> >> parser cannot deal with the statement.
>> >>
>> >> However, all of that is future work and can be discussed in a separate
>> >> FLIP.
>> >>
>> >> 3. +1 for the `Iterator` instead of `Iterable`.
>> >>
>> >> 4. "we should convert the checked exception to unchecked exception"
>> >>
>> >> Yes, I meant using a runtime exception instead of a checked exception.
>> >> There was no consensus on putting the exception into the `TableResult`.
>> >>
>> >> Regards,
>> >> Timo
>> >>
>> >> On 01.04.20 15:35, Dawid Wysakowicz wrote:
>> >>> When considering the multi-line support I think it is helpful to 

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

2020-04-29 Thread Xuannan Su
Hi folks,

The FLIP-36 is updated according to the discussion with Becket. In the
meantime, any comments are very welcome.

If there are no further comments, I would like to start the voting
thread by tomorrow.

Thanks,
Xuannan


On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su  wrote:

> Hi Becket,
>
> You are right. It makes sense to treat retry of job 2 as an ordinary job.
> And the config does introduce some unnecessary confusion. Thank you for you
> comment. I will update the FLIP.
>
> Best,
> Xuannan
>
> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin  wrote:
>
>> Hi Xuannan,
>>
>> If user submits Job 1 and generated a cached intermediate result. And
>> later
>> on, user submitted job 2 which should ideally use the intermediate result.
>> In that case, if job 2 failed due to missing the intermediate result, Job
>> 2
>> should be retried with its full DAG. After that when Job 2 runs, it will
>> also re-generate the cache. However, once job 2 has fell back to the
>> original DAG, should it just be treated as an ordinary job that follow the
>> recovery strategy? Having a separate configuration seems a little
>> confusing. In another word, re-generating the cache is just a byproduct of
>> running the full DAG of job 2, but is not the main purpose. It is just
>> like
>> when job 1 runs to generate cache, it does not have a separate config of
>> retry to make sure the cache is generated. If it fails, it just fail like
>> an ordinary job.
>>
>> What do you think?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su  wrote:
>>
>> > Hi Becket,
>> >
>> > The intermediate result will indeed be automatically re-generated by
>> > resubmitting the original DAG. And that job could fail as well. In that
>> > case, we need to decide if we should resubmit the original DAG to
>> > re-generate the intermediate result or give up and throw an exception to
>> > the user. And the config is to indicate how many resubmit should happen
>> > before giving up.
>> >
>> > Thanks,
>> > Xuannan
>> >
>> > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin 
>> wrote:
>> >
>> > > Hi Xuannan,
>> > >
>> > >  I am not entirely sure if I understand the cases you mentioned. The
>> > users
>> > > > can use the cached table object returned by the .cache() method in
>> > other
>> > > > job and it should read the intermediate result. The intermediate
>> result
>> > > can
>> > > > gone in the following three cases: 1. the user explicitly call the
>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>> failure
>> > > > happens on the TM. When that happens, the intermeidate result will
>> not
>> > be
>> > > > available unless it is re-generated.
>> > >
>> > >
>> > > What confused me was that why do we need to have a *cache.retries.max
>> > > *config?
>> > > Shouldn't the missing intermediate result always be automatically
>> > > re-generated if it is gone?
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su 
>> > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > Thanks for the comments.
>> > > >
>> > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin 
>> > wrote:
>> > > >
>> > > > > Hi Xuannan,
>> > > > >
>> > > > > Thanks for picking up the FLIP. It looks good to me overall. Some
>> > quick
>> > > > > comments / questions below:
>> > > > >
>> > > > > 1. Do we also need changes in the Java API?
>> > > > >
>> > > >
>> > > > Yes, the public interface of Table and TableEnvironment should be
>> made
>> > in
>> > > > the Java API.
>> > > >
>> > > >
>> > > > > 2. What are the cases that users may want to retry reading the
>> > > > intermediate
>> > > > > result? It seems that once the intermediate result has gone, it
>> will
>> > > not
>> > > > be
>> > > > > available later without being generated again, right?
>> > > > >
>> > > >
>> > > >  I am not entirely sure if I understand the cases you mentioned. The
>> > > users
>> > > > can use the cached table object returned by the .cache() method in
>> > other
>> > > > job and it should read the intermediate result. The intermediate
>> result
>> > > can
>> > > > gone in the following three cases: 1. the user explicitly call the
>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>> failure
>> > > > happens on the TM. When that happens, the intermeidate result will
>> not
>> > be
>> > > > available unless it is re-generated.
>> > > >
>> > > > 3. In the "semantic of cache() method" section, the description "The
>> > > > > semantic of the *cache() *method is a little different depending
>> on
>> > > > whether
>> > > > > auto caching is enabled or not." seems not explained.
>> > > > >
>> > > >
>> > > > This line is actually outdated and should be removed, as we are not
>> > > adding
>> > > > the auto caching functionality in this FLIP. Auto caching will be
>> added
>> > > in
>> > > > the future, and the semantic of cache() when auto caching is enabled
>> > will
>> > > > be discussed in 

Re: [DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Yangze Guo
For your convenience, I modified the Tokenizer in "WordCount"[1] case
to show how UDF leverages GPU info and how we found that problem.

[1] 
https://github.com/KarmaGYZ/flink/blob/7c5596e43f6d14c65063ab0917f3c0d4bc0211ed/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

Best,
Yangze Guo

On Wed, Apr 29, 2020 at 3:25 PM Xintong Song  wrote:
>
> >
> > Will she ask for some properties and then pass them to another component?
>
> Yes. Take GPU as an example, the property needed is "GPU index", and the
> index will be used to tell the OS which GPU should be used for the
> computing workload.
>
>
> > Where does this component come from?
>
> The component could be either the UDF/operator itself, or some AI libraries
> used by the operator. For 1.11, we do not have plan for introducing new GPU
> aware operators in Flink. So all the usages of the GPU resources should
> come from UDF. Please correct me if I am wrong, @Becket.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Apr 29, 2020 at 3:14 PM Till Rohrmann  wrote:
>
> > Thanks for bringing this up Yangze and Xintong. I see the problem. Help me
> > to understand how the ExternalResourceInfo is intended to be used by the
> > user. Will she ask for some properties and then pass them to another
> > component? Where does this component come from?
> >
> > Cheers,
> > Till
> >
> > On Wed, Apr 29, 2020 at 9:05 AM Xintong Song 
> > wrote:
> >
> > > Thanks for kicking off this discussion, Yangze.
> > >
> > > First, let me try to explain a bit more about this problem. Since we
> > > decided to make the `ExternalResourceDriver` a plugin whose
> > implementation
> > > could be provided by user, we think it makes sense to leverage Flink’s
> > > plugin mechanism and load the drivers in separated class loaders to avoid
> > > potential risk of dependency conflicts. However, that means
> > > `RuntimeContext` and user codes do not naturally have access to classes
> > > defined in the plugin. In the current design,
> > > `RuntimeContext#getExternalResourceInfos` takes the concrete
> > > `ExternalResourceInfo` implementation class as an argument. This will
> > cause
> > > problem when user codes try to pass in the argument, and when
> > > `RuntimeContext` tries to do the type check/cast.
> > >
> > >
> > > To my understanding, the root problem is probably that we should not
> > depend
> > > on a specific implementation of the `ExternalResourceInfo` interface from
> > > outside the plugin (user codes & runtime context). To that end,
> > regardless
> > > the detailed interface design, I'm in favor of the direction of the 3rd
> > > approach. I think it makes sense to add some general information/property
> > > accessing interfaces in `ExternalResourceInfo` (e.g., a key-value
> > property
> > > map), so that in most cases users do not need to cast the
> > > `ExternalResourceInfo` into concrete subclasses.
> > >
> > >
> > > Regarding the detailed interface design, I'm not sure about using
> > > `Properties`. I think the information contained in a
> > `ExternalResourceInfo`
> > > can be considered as a unmodifiable map. So maybe something like the
> > > following?
> > >
> > >
> > > public interface ExternalResourceInfo {
> > > > String getProperty(String key);
> > > > Map getProperties();
> > > > }
> > >
> > >
> > > WDYT?
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Wed, Apr 29, 2020 at 2:40 PM Yangze Guo  wrote:
> > >
> > > > Hi, there:
> > > >
> > > > The "FLIP-108: Add GPU support in Flink"[1] is now working in
> > > > progress. However, we met a problem with
> > > > "RuntimeContext#getExternalResourceInfos" if we want to leverage the
> > > > Plugin[2] mechanism in Flink.
> > > > The interface is:
> > > > The problem is now:
> > > > public interface RuntimeContext {
> > > > /**
> > > >  * Get the specific external resource information by the
> > > resourceName.
> > > >  */
> > > >  Set
> > > > getExternalResourceInfos(String resourceName, Class
> > > > externalResourceType);
> > > > }
> > > > The problem is that the mainClassLoader does not recognize the
> > > > subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
> > > > located in ExternalResourceDriver jar and has been isolated from
> > > > mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
> > > > thrown out.
> > > >
> > > > The solution could be:
> > > >
> > > > - Not leveraging the plugin mechanism. Just load drivers to
> > > > mainClassLoader. The drawback is that user needs to handle the
> > > > dependency conflict.
> > > >
> > > > - Force user to build two separate jars. One for the
> > > > ExternalResourceDriver, the other for the ExternalResourceInfo. The
> > > > jar including ExternalResourceInfo class should be added to “/lib”
> > > > dir. This approach probably makes sense but might annoy user.
> > > >
> > > > - Change the RuntimeContext#getExternalResourceInfos, let 

[jira] [Created] (FLINK-17457) Manage Hive metadata via Flink DDL

2020-04-29 Thread Rui Li (Jira)
Rui Li created FLINK-17457:
--

 Summary: Manage Hive metadata via Flink DDL
 Key: FLINK-17457
 URL: https://issues.apache.org/jira/browse/FLINK-17457
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Rui Li


To implement hive dialect, we encode lots of DDL semantics as properties. We 
should decide whether/how to allow users to directly use these properties in 
Flink DDL to achieve the same semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17456) Update hive connector tests to execute DDL & DML via TableEnvironment

2020-04-29 Thread Rui Li (Jira)
Rui Li created FLINK-17456:
--

 Summary: Update hive connector tests to execute DDL & DML via 
TableEnvironment
 Key: FLINK-17456
 URL: https://issues.apache.org/jira/browse/FLINK-17456
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Rui Li
 Fix For: 1.11.0


In hive connector tests, currently we delegate DDLs and some DMLs to the hive 
shell. With hive parser in place, we should be able to execute those DDLs and 
DMLs on Flink side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Xintong Song
>
> Will she ask for some properties and then pass them to another component?

Yes. Take GPU as an example, the property needed is "GPU index", and the
index will be used to tell the OS which GPU should be used for the
computing workload.


> Where does this component come from?

The component could be either the UDF/operator itself, or some AI libraries
used by the operator. For 1.11, we do not have plan for introducing new GPU
aware operators in Flink. So all the usages of the GPU resources should
come from UDF. Please correct me if I am wrong, @Becket.

Thank you~

Xintong Song



On Wed, Apr 29, 2020 at 3:14 PM Till Rohrmann  wrote:

> Thanks for bringing this up Yangze and Xintong. I see the problem. Help me
> to understand how the ExternalResourceInfo is intended to be used by the
> user. Will she ask for some properties and then pass them to another
> component? Where does this component come from?
>
> Cheers,
> Till
>
> On Wed, Apr 29, 2020 at 9:05 AM Xintong Song 
> wrote:
>
> > Thanks for kicking off this discussion, Yangze.
> >
> > First, let me try to explain a bit more about this problem. Since we
> > decided to make the `ExternalResourceDriver` a plugin whose
> implementation
> > could be provided by user, we think it makes sense to leverage Flink’s
> > plugin mechanism and load the drivers in separated class loaders to avoid
> > potential risk of dependency conflicts. However, that means
> > `RuntimeContext` and user codes do not naturally have access to classes
> > defined in the plugin. In the current design,
> > `RuntimeContext#getExternalResourceInfos` takes the concrete
> > `ExternalResourceInfo` implementation class as an argument. This will
> cause
> > problem when user codes try to pass in the argument, and when
> > `RuntimeContext` tries to do the type check/cast.
> >
> >
> > To my understanding, the root problem is probably that we should not
> depend
> > on a specific implementation of the `ExternalResourceInfo` interface from
> > outside the plugin (user codes & runtime context). To that end,
> regardless
> > the detailed interface design, I'm in favor of the direction of the 3rd
> > approach. I think it makes sense to add some general information/property
> > accessing interfaces in `ExternalResourceInfo` (e.g., a key-value
> property
> > map), so that in most cases users do not need to cast the
> > `ExternalResourceInfo` into concrete subclasses.
> >
> >
> > Regarding the detailed interface design, I'm not sure about using
> > `Properties`. I think the information contained in a
> `ExternalResourceInfo`
> > can be considered as a unmodifiable map. So maybe something like the
> > following?
> >
> >
> > public interface ExternalResourceInfo {
> > > String getProperty(String key);
> > > Map getProperties();
> > > }
> >
> >
> > WDYT?
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Apr 29, 2020 at 2:40 PM Yangze Guo  wrote:
> >
> > > Hi, there:
> > >
> > > The "FLIP-108: Add GPU support in Flink"[1] is now working in
> > > progress. However, we met a problem with
> > > "RuntimeContext#getExternalResourceInfos" if we want to leverage the
> > > Plugin[2] mechanism in Flink.
> > > The interface is:
> > > The problem is now:
> > > public interface RuntimeContext {
> > > /**
> > >  * Get the specific external resource information by the
> > resourceName.
> > >  */
> > >  Set
> > > getExternalResourceInfos(String resourceName, Class
> > > externalResourceType);
> > > }
> > > The problem is that the mainClassLoader does not recognize the
> > > subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
> > > located in ExternalResourceDriver jar and has been isolated from
> > > mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
> > > thrown out.
> > >
> > > The solution could be:
> > >
> > > - Not leveraging the plugin mechanism. Just load drivers to
> > > mainClassLoader. The drawback is that user needs to handle the
> > > dependency conflict.
> > >
> > > - Force user to build two separate jars. One for the
> > > ExternalResourceDriver, the other for the ExternalResourceInfo. The
> > > jar including ExternalResourceInfo class should be added to “/lib”
> > > dir. This approach probably makes sense but might annoy user.
> > >
> > > - Change the RuntimeContext#getExternalResourceInfos, let it return
> > > ExternalResourceInfo and add something like “Properties getInfo()” to
> > > ExternalResourceInfo interface. The contract for resolving the return
> > > value would be specified by the driver provider and user. The Flink
> > > core does not need to be aware of the concrete implementation:
> > > public interface RuntimeContext {
> > > /**
> > >  * Get the specific external resource information by the
> > resourceName.
> > >  */
> > > Set getExternalResourceInfos(String
> > > resourceName);
> > > }
> > > public interface ExternalResourceInfo {
> > > Properties getInfo();
> > > }
> > >
> > > From my side, I 

Re: [DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Till Rohrmann
Thanks for bringing this up Yangze and Xintong. I see the problem. Help me
to understand how the ExternalResourceInfo is intended to be used by the
user. Will she ask for some properties and then pass them to another
component? Where does this component come from?

Cheers,
Till

On Wed, Apr 29, 2020 at 9:05 AM Xintong Song  wrote:

> Thanks for kicking off this discussion, Yangze.
>
> First, let me try to explain a bit more about this problem. Since we
> decided to make the `ExternalResourceDriver` a plugin whose implementation
> could be provided by user, we think it makes sense to leverage Flink’s
> plugin mechanism and load the drivers in separated class loaders to avoid
> potential risk of dependency conflicts. However, that means
> `RuntimeContext` and user codes do not naturally have access to classes
> defined in the plugin. In the current design,
> `RuntimeContext#getExternalResourceInfos` takes the concrete
> `ExternalResourceInfo` implementation class as an argument. This will cause
> problem when user codes try to pass in the argument, and when
> `RuntimeContext` tries to do the type check/cast.
>
>
> To my understanding, the root problem is probably that we should not depend
> on a specific implementation of the `ExternalResourceInfo` interface from
> outside the plugin (user codes & runtime context). To that end, regardless
> the detailed interface design, I'm in favor of the direction of the 3rd
> approach. I think it makes sense to add some general information/property
> accessing interfaces in `ExternalResourceInfo` (e.g., a key-value property
> map), so that in most cases users do not need to cast the
> `ExternalResourceInfo` into concrete subclasses.
>
>
> Regarding the detailed interface design, I'm not sure about using
> `Properties`. I think the information contained in a `ExternalResourceInfo`
> can be considered as a unmodifiable map. So maybe something like the
> following?
>
>
> public interface ExternalResourceInfo {
> > String getProperty(String key);
> > Map getProperties();
> > }
>
>
> WDYT?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Apr 29, 2020 at 2:40 PM Yangze Guo  wrote:
>
> > Hi, there:
> >
> > The "FLIP-108: Add GPU support in Flink"[1] is now working in
> > progress. However, we met a problem with
> > "RuntimeContext#getExternalResourceInfos" if we want to leverage the
> > Plugin[2] mechanism in Flink.
> > The interface is:
> > The problem is now:
> > public interface RuntimeContext {
> > /**
> >  * Get the specific external resource information by the
> resourceName.
> >  */
> >  Set
> > getExternalResourceInfos(String resourceName, Class
> > externalResourceType);
> > }
> > The problem is that the mainClassLoader does not recognize the
> > subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
> > located in ExternalResourceDriver jar and has been isolated from
> > mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
> > thrown out.
> >
> > The solution could be:
> >
> > - Not leveraging the plugin mechanism. Just load drivers to
> > mainClassLoader. The drawback is that user needs to handle the
> > dependency conflict.
> >
> > - Force user to build two separate jars. One for the
> > ExternalResourceDriver, the other for the ExternalResourceInfo. The
> > jar including ExternalResourceInfo class should be added to “/lib”
> > dir. This approach probably makes sense but might annoy user.
> >
> > - Change the RuntimeContext#getExternalResourceInfos, let it return
> > ExternalResourceInfo and add something like “Properties getInfo()” to
> > ExternalResourceInfo interface. The contract for resolving the return
> > value would be specified by the driver provider and user. The Flink
> > core does not need to be aware of the concrete implementation:
> > public interface RuntimeContext {
> > /**
> >  * Get the specific external resource information by the
> resourceName.
> >  */
> > Set getExternalResourceInfos(String
> > resourceName);
> > }
> > public interface ExternalResourceInfo {
> > Properties getInfo();
> > }
> >
> > From my side, I prefer the third approach:
> > - Regarding usability, it frees user from handling dependency or
> > packaging two jars.
> > - It decouples the Flink's mainClassLoader from the concrete
> > implementation of ExternalResourceInfo.
> >
> > Looking forward to your feedback.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-108%3A+Add+GPU+support+in+Flink
> > [2]
> > https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html
> >
> >
> > Best,
> > Yangze Guo
> >
>


Re: [DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Xintong Song
Thanks for kicking off this discussion, Yangze.

First, let me try to explain a bit more about this problem. Since we
decided to make the `ExternalResourceDriver` a plugin whose implementation
could be provided by user, we think it makes sense to leverage Flink’s
plugin mechanism and load the drivers in separated class loaders to avoid
potential risk of dependency conflicts. However, that means
`RuntimeContext` and user codes do not naturally have access to classes
defined in the plugin. In the current design,
`RuntimeContext#getExternalResourceInfos` takes the concrete
`ExternalResourceInfo` implementation class as an argument. This will cause
problem when user codes try to pass in the argument, and when
`RuntimeContext` tries to do the type check/cast.


To my understanding, the root problem is probably that we should not depend
on a specific implementation of the `ExternalResourceInfo` interface from
outside the plugin (user codes & runtime context). To that end, regardless
the detailed interface design, I'm in favor of the direction of the 3rd
approach. I think it makes sense to add some general information/property
accessing interfaces in `ExternalResourceInfo` (e.g., a key-value property
map), so that in most cases users do not need to cast the
`ExternalResourceInfo` into concrete subclasses.


Regarding the detailed interface design, I'm not sure about using
`Properties`. I think the information contained in a `ExternalResourceInfo`
can be considered as a unmodifiable map. So maybe something like the
following?


public interface ExternalResourceInfo {
> String getProperty(String key);
> Map getProperties();
> }


WDYT?


Thank you~

Xintong Song



On Wed, Apr 29, 2020 at 2:40 PM Yangze Guo  wrote:

> Hi, there:
>
> The "FLIP-108: Add GPU support in Flink"[1] is now working in
> progress. However, we met a problem with
> "RuntimeContext#getExternalResourceInfos" if we want to leverage the
> Plugin[2] mechanism in Flink.
> The interface is:
> The problem is now:
> public interface RuntimeContext {
> /**
>  * Get the specific external resource information by the resourceName.
>  */
>  Set
> getExternalResourceInfos(String resourceName, Class
> externalResourceType);
> }
> The problem is that the mainClassLoader does not recognize the
> subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
> located in ExternalResourceDriver jar and has been isolated from
> mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
> thrown out.
>
> The solution could be:
>
> - Not leveraging the plugin mechanism. Just load drivers to
> mainClassLoader. The drawback is that user needs to handle the
> dependency conflict.
>
> - Force user to build two separate jars. One for the
> ExternalResourceDriver, the other for the ExternalResourceInfo. The
> jar including ExternalResourceInfo class should be added to “/lib”
> dir. This approach probably makes sense but might annoy user.
>
> - Change the RuntimeContext#getExternalResourceInfos, let it return
> ExternalResourceInfo and add something like “Properties getInfo()” to
> ExternalResourceInfo interface. The contract for resolving the return
> value would be specified by the driver provider and user. The Flink
> core does not need to be aware of the concrete implementation:
> public interface RuntimeContext {
> /**
>  * Get the specific external resource information by the resourceName.
>  */
> Set getExternalResourceInfos(String
> resourceName);
> }
> public interface ExternalResourceInfo {
> Properties getInfo();
> }
>
> From my side, I prefer the third approach:
> - Regarding usability, it frees user from handling dependency or
> packaging two jars.
> - It decouples the Flink's mainClassLoader from the concrete
> implementation of ExternalResourceInfo.
>
> Looking forward to your feedback.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-108%3A+Add+GPU+support+in+Flink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html
>
>
> Best,
> Yangze Guo
>


Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Robert Metzger
Thanks a lot for creating a release candidate for 1.10.1!

I'm not sure, but I think found a potential issue in the release while
checking dependency changes on the ElasticSearch7 connector:
https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38

In this change, "com.carrotsearch:hppc" has been added to the shaded jar (
https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar),
without including proper mention of that dependency in "META-INF/NOTICE".


My checking notes:

- checked the diff for dependency changes:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
(w/o
release commit:
https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
)
  - flink-connector-hive sets the derby version for test-scoped
dependencies:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
 - no NOTICE file found, but this module does not forward binaries.
  - kafka 0.10 minor version upgrade:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
  - NOTICE change found
   - ES7 changes shading:
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
 - problem found
  - Influxdb version change
https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
 - NOTICE change found



On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #1 for version 1.10.1, as
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.10.1-rc1" [5],
> * website pull request listing the new release and adding announcement blog
> post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Yu
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
> >
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1362/
> [5]
>
> https://github.com/apache/flink/commit/84b74cc0e21981bf6feceb74b48d7a9d3e215dc5
> [6] https://github.com/apache/flink-web/pull/330
>


[DISCUSS][FLIP-108] Problems regarding the class loader and dependency

2020-04-29 Thread Yangze Guo
Hi, there:

The "FLIP-108: Add GPU support in Flink"[1] is now working in
progress. However, we met a problem with
"RuntimeContext#getExternalResourceInfos" if we want to leverage the
Plugin[2] mechanism in Flink.
The interface is:
The problem is now:
public interface RuntimeContext {
/**
 * Get the specific external resource information by the resourceName.
 */
 Set
getExternalResourceInfos(String resourceName, Class
externalResourceType);
}
The problem is that the mainClassLoader does not recognize the
subclasses of ExternalResourceInfo. Those ExternalResourceInfo is
located in ExternalResourceDriver jar and has been isolated from
mainClassLoader by PluginManager. So, ClassNotFoundExeption will be
thrown out.

The solution could be:

- Not leveraging the plugin mechanism. Just load drivers to
mainClassLoader. The drawback is that user needs to handle the
dependency conflict.

- Force user to build two separate jars. One for the
ExternalResourceDriver, the other for the ExternalResourceInfo. The
jar including ExternalResourceInfo class should be added to “/lib”
dir. This approach probably makes sense but might annoy user.

- Change the RuntimeContext#getExternalResourceInfos, let it return
ExternalResourceInfo and add something like “Properties getInfo()” to
ExternalResourceInfo interface. The contract for resolving the return
value would be specified by the driver provider and user. The Flink
core does not need to be aware of the concrete implementation:
public interface RuntimeContext {
/**
 * Get the specific external resource information by the resourceName.
 */
Set getExternalResourceInfos(String resourceName);
}
public interface ExternalResourceInfo {
Properties getInfo();
}

>From my side, I prefer the third approach:
- Regarding usability, it frees user from handling dependency or
packaging two jars.
- It decouples the Flink's mainClassLoader from the concrete
implementation of ExternalResourceInfo.

Looking forward to your feedback.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-108%3A+Add+GPU+support+in+Flink
[2] https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html


Best,
Yangze Guo


[jira] [Created] (FLINK-17455) Move FileSystemFormatFactory to table common

2020-04-29 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-17455:


 Summary: Move FileSystemFormatFactory to table common
 Key: FLINK-17455
 URL: https://issues.apache.org/jira/browse/FLINK-17455
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-04-29 Thread Jark Wu
Hi lsyldliu,

Thanks for investigating this.

First of all, if you are using mini-batch deduplication, it doesn't support
state ttl in 1.9. That's why the tps looks the same with 1.11 disable state
ttl.
We just introduce state ttl for mini-batch deduplication recently.

Regarding to the performance regression, it looks very surprise to me. The
performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
I don't have much experience of the underlying of StateTtlConfig. So I loop
in @Yu Li  @YunTang in CC who may have more insights on
this.

For more information, we use the following StateTtlConfig [1] in blink
planner:

StateTtlConfig
  .newBuilder(Time.milliseconds(retentionTime))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build();


Best,
Jark


[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27





On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:

> Hi, all!
>
> At flink master branch, we have supported state ttl  for sql mini-batch
> deduplication using incremental cleanup strategy on heap backend, refer to
> FLINK-16581. Because I want to test the performance of this feature, so I
> compile master branch code and deploy the jar to production
> environment,then run three types of tests, respectively:
>
>
>
>
> flink 1.9.0 release version enable state ttl
> flink 1.11-snapshot version disable state ttl
> flink 1.11-snapshot version enable state ttl
>
>
>
>
> The test query sql as follows:
>
> select order_date,
> sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> goods_carriage_amt) as saleP,
> sum(amount) as saleN,
> count(distinct parent_sn) as orderN,
> count(distinct user_id) as cusN
>from(
> select order_date, user_id,
> order_type, order_status, terminal, last_update_time,
> goods_all_fav_amt,
> goods_carriage_amt, virtual_money_amt, price, amount,
> order_quality, quality_goods_cnt, acture_goods_amt
> from (select *, row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
> where rownum=1
> and (order_type in (1,2,3,4,5) or order_status = 70)
> and terminal = 'shop' and price > 0)
> group by order_date
>
>
> At runtime, this query will generate two operators which include
> Deduplication and GroupAgg. In the test, the configuration is same,
> parallelism is 20, set kafka consumer from the earliest, and disable
> mini-batch function, The test results as follows:
>
> flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> records, average tps at 5200/s, Flink UI picture link back pressure,
> checkpoint
> flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
> receive 883w records, average tps at 5200/s, Flink UI picture link back
> pressure, checkpoint
> flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> flink only receive 168w records because of deduplication operator serious
> back pressure, average tps at 270/s, moreover, checkpoint always fail
> because of deduplication operator serious back pressure, Flink UI picture
> link back pressure, checkpoint
>
> Deduplication state clean up implement in flink 1.9.0 use timer, but
> 1.11-snapshot version use StateTtlConfig, this is the main difference.
> Comparing the three tests comprehensively, we can see that if disable state
> ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state
> ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> nearly 20 times, so I think incremental cleanup strategy cause this
> problem, what do you think about it? @azagrebin, @jark.
>
> Thanks.
>
> lsyldliu
>
> Zhejiang University, College of Control Science and engineer, CSC