[jira] [Created] (FLINK-19686) Cast question for data | time

2020-10-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-19686:
-

 Summary: Cast question for data | time 
 Key: FLINK-19686
 URL: https://issues.apache.org/jira/browse/FLINK-19686
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: [VOTE] Add Translation Specification for Stateful Functions

2020-10-16 Thread Shawn Huang
Hi, Congxian.
Thanks for your contribution.
I noticed that in wiki, the link format is still like "{{ site.baseurl
}}/.../xxx.html".
But the "{% link %}" tag is recommended now [1].
So is it necessary to update this in wiki? It seems some translators
haven't noticed this.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Reminder-Prefer-link-tag-in-documentation-td42362.html

Best,
Shawn Huang


Congxian Qiu  于2020年10月16日周五 下午11:10写道:

> FYI, I've added the Specification for Stateful Functions to the existing
> wiki[1]
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications
>
> Best,
> Congxian
>
>
> Congxian Qiu  于2020年10月15日周四 下午8:33写道:
>
> > Hi all
> > Thanks everyone for the voting.
> >
> > The voting time for "Add Translation Specification for Stateful
> > Functions" has passed, I'm closing the vote now.
> >
> > There were 7 votes, 4 of which are binding:
> >- Yu Li (binding)
> >- Jark Wu (binding)
> >- Xintong Song (binding)
> >- Smile
> >- Dian Fu (binding)
> >- Hailong Wang
> >- Shawn Huang
> >
> > There were no -1 votes.
> >
> > Thus, changes have been accepted. I'll update the wiki accordingly.
> >
> > Best,
> > Congxian
> >
> >
> > Shawn Huang  于2020年10月13日周二 下午3:19写道:
> >
> >> +1
> >>
> >> Best,
> >> Shawn Huang
> >>
> >>
> >> hailongwang <18868816...@163.com> 于2020年10月12日周一 下午11:21写道:
> >>
> >> > +1
> >> > Best,
> >> > Hailong Wang
> >> > At 2020-10-12 17:00:34, "Xintong Song"  wrote:
> >> > >+1
> >> > >
> >> > >Thank you~
> >> > >
> >> > >Xintong Song
> >> > >
> >> > >
> >> > >
> >> > >On Mon, Oct 12, 2020 at 5:59 PM Jark Wu  wrote:
> >> > >
> >> > >> +1
> >> > >>
> >> > >> On Mon, 12 Oct 2020 at 17:14, Yu Li  wrote:
> >> > >>
> >> > >> > +1
> >> > >> >
> >> > >> > Best Regards,
> >> > >> > Yu
> >> > >> >
> >> > >> >
> >> > >> > On Mon, 12 Oct 2020 at 14:41, Congxian Qiu <
> qcx978132...@gmail.com
> >> >
> >> > >> wrote:
> >> > >> >
> >> > >> > > I would like to start a voting thread for adding translation
> >> > >> > specification
> >> > >> > > for Stateful Functions, which we’ve reached consensus in [1].
> >> > >> > >
> >> > >> > >
> >> > >> > > This voting will be open for a minimum 3 days till 3:00 pm UTC,
> >> Oct
> >> > 15.
> >> > >> > >
> >> > >> > >
> >> > >> > > [1]
> >> > >> > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-Translation-Specification-for-Stateful-Functions-td45531.html
> >> > >> > >
> >> > >> > >
> >> > >> > > Best,
> >> > >> > > Congxian
> >> > >> > >
> >> > >> >
> >> > >>
> >> >
> >>
> >
>


Re: IllegalStateException from incompatible state type - better exception type?

2020-10-16 Thread Kim, Hwanju
Thanks Gordon.

If its semantic is not (yet) clearly defined, I think we can make it a bit more 
clear.
Incrementally, I think any exception that's more specific than general Java 
exception (e.g., IllegalStateException) would be at least making information 
better, where or not there is clearly settled guideline for specific exceptions 
like StateMigrationException.

Thanks,
Hwanju

On 10/7/20, 11:53 PM, "Tzu-Li (Gordon) Tai"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi,

Thanks for the discussions so far. I agree that the use of the
StateMigrationException and it's semantics is rather undefined and
inconsistent as of the current state.

On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju  wrote:

> Hi Till,
>
> Thanks for the opinion and insight. I think your concern is valid, as
> still nontrivial number of issues would be somewhat subtle to confidently
> say it's framework or user error. Serialization-related issue is one of
> them, as even user-triggered issue leads to exceptions in many places from
> the framework without clear boundary. The approach so far has been not
> based on theory and completeness, but more on data-driven and likelihood.
> When it's confusing, we are err on the side of saying it's framework 
issue.
> But what you pointed out about StateMigrationException from
> migrateSerializedValue could be one example that is ambiguous. General 
idea
> about such ambiguous issue is if it's one-off hiccup, misclassification
> effect is low, but if it's constant (e.g., filesystem not working well),
> its impact is high but there should be more exceptions thrown from many
> places not just from a single point of state migration and expecting other
> dominant exceptions to be notified as framework issue.
>
> If I understand it correctly,
> 1) From the perspective of semantic, state type mismatch (e.g.,
> aggregating vs. reducing) can be thrown as StateMigrationException, as it
> is clearly from user with incompatible state.
> 2) Subtle StateMigrationException throws (like during migration not from
> compatibility check), we can refine that catching lower-level exceptions 
in
> a finer grain manner (possibly using a different exception).
>

This observation is accurate. To summarize the discussions so far, we can
categorize all failures on restore related to state compatibility into two
categories:
1) incompatible state serializers
(TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs
MapState), and
2) if all compatibility checks pass and migration is required, subtle
filesystem hiccups / corrupted checkpoints files / incorrect serializer
implementations can still surface during the process of a migration.

I'm wondering if we should instead include a different exception type for
1).

For 1), a new exception type named IncompatibleStateDescriptorException
(TBD) seems like a better name, as in that case, the state can never be
migrated and is always a user-failure.
The name immediately implies that user-provided properties for state
access, e.g. serializers / state type, are incompatible with checkpointed
data.

For 2), we can continue to use StateMigrationException to wrap lower-level
exceptions. The intent here isn't to classify whether the error was
user-caused or a framework issue, but to simply relate the exception to the
process of state migration.

I agree that these can be handled separately, by starting off first with
differentiating the above scenarios with different exception types /
clearing the use of StateMigrationException.

Cheers,
Gordon


>
> I think the two issues would be independent, so either can be handled
> separately as long as the semantic of StateMigrationException is clear. 
Out
> of the two, 1) seems straightforward, so if agreed, I can create an issue
> to work on. Nonetheless, I would like to hear more about Gordon's thought.
>
> Thanks,
> Hwanju
>
> On 10/7/20, 1:39 AM, "Till Rohrmann"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Hi Hwanju,
>
> thanks for starting this discussion. I pretty much like the idea to be
> able
> to distinguish between user code faults and framework faults. This 
also
> helps in deciding on the recovery action to take. Hence, I would be +1
> for
> using certain exceptions consistently in order to indicate the origin
> of a
> fault where possible. The big challenge I see is to do it 

[jira] [Created] (FLINK-19685) When use HBase-Connector lookupFunction, 8 hours less to query `DATE`,`TIME`,`TIMESTAMP` data

2020-10-16 Thread CaoZhen (Jira)
CaoZhen created FLINK-19685:
---

 Summary: When use HBase-Connector lookupFunction, 8 hours less to 
query `DATE`,`TIME`,`TIMESTAMP` data 
 Key: FLINK-19685
 URL: https://issues.apache.org/jira/browse/FLINK-19685
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Reporter: CaoZhen


from the code:

HBaseSerde#createFieldDecoder
{code:java}
case DATE:
case INTERVAL_YEAR_MONTH:
   return Bytes::toInt;
case TIME_WITHOUT_TIME_ZONE:
   ..
   return Bytes::toInt;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
   ..
   return createTimestampDecoder();


private static FieldDecoder createTimestampDecoder() {
   return value -> {
  // TODO: support higher precision
  long milliseconds = Bytes.toLong(value);
  return TimestampData.fromEpochMillis(milliseconds);
   };
}
{code}
 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image

2020-10-16 Thread Till Rohrmann
+1 for making it configurable in the same Docker image.

Cheers,
Till

On Fri, Oct 16, 2020 at 12:56 PM Chesnay Schepler 
wrote:

> If it is possible to support both allocators in a single image then we
> should definitely go with that option.
>
> On 10/16/2020 12:21 PM, Yun Tang wrote:
> > Thanks for Yang's suggestion. I think this could be a better choice.
> > We could install jemalloc and only enable it in LD_PRELOAD when user
> pass specific configuration for docker-entrypoint.sh.
> > By doing so, we could avoid to create another docker image tags and also
> offer ability to reduce memory fragmentation problem.
> >
> > Does anyone else have other ideas?
> >
> > Best
> > Yun Tang
> > 
> > From: Yang Wang 
> > Sent: Thursday, October 15, 2020 14:59
> > To: dev 
> > Subject: Re: [DISCUSS][docker] Adopt Jemalloc as default memory
> allocator for debian based Flink docker image
> >
> > Thanks Yun Tang for starting this discussion.
> >
> > I think this is very important when deploy Flink with container
> environment
> > in production. I just
> > have quick question. Could we have both memory allocator(e.g. glibc,
> > jemalloc) in the Flink
> > official image and enable a specific one by setting ENV?
> >
> > Best,
> > Yang
> >
> > Yu Li  于2020年10月14日周三 下午12:23写道:
> >
> >> Thanks for debugging and resolving the issue and driving the discussion
> >> Yun!
> >>
> >> For the given solutions, I prefer option 1 (supply another Dockerfile
> using
> >> jemalloc as default memory allocator) because of the below reasons:
> >>
> >> 1. It's hard to say jemalloc is always better than ptmalloc (glibc
> malloc),
> >> or else glibc should have already adopted it as the default memory
> >> allocator. And as indicated here [1], in some cases jemalloc will
> >> consume as much as twice the memory than glibc
> >>
> >> 2. All existing Flink docker images use glibc, if we change the default
> >> memory allocator to jemalloc and only supply one series of images, we
> will
> >> leave those having better performance with glibc no other choices but
> >> staying with old images. In another word, there's a risk of introducing
> new
> >> problems while fixing an existing one if choosing option-2.
> >>
> >> And there is a third option considering the efforts of maintaining more
> >> images if the memory leak issue is not widely observed, that we could
> >> document the steps of building Dockerfile with jemalloc as default
> >> allocator so users could build it when needed, which leaves the burden
> to
> >> our users so for me it's not the best option.
> >>
> >> Best Regards,
> >> Yu
> >>
> >> [1] https://stackoverflow.com/a/33993215
> >>
> >> On Tue, 13 Oct 2020 at 15:34, Yun Tang  wrote:
> >>
> >>> Hi all
> >>>
> >>> Users report they meet serious memory leak when submitting jobs
> >>> continously in session mode within k8s (please refer to FLINK-18712[1]
> ),
> >>> and I also reproduce this to find this is caused by memory
> fragmentation
> >> of
> >>> glibc [2][3] and provide solutions to fix this:
> >>>
> >>>*   Quick but not very clean solution to limit the memory pool of
> >> glibc,
> >>> limit MALLOC_ARENA_MAX to 2
> >>>
> >>>*   More general solution by rebuilding the image to install
> >>> libjemalloc-dev and add the libjemalloc.so it to LD_PRELOAD
> >>>
> >>> The reporter adopted the 2nd solution to fix this issue eventually.
> Thus,
> >>> I begin to think whether we should change our Dockerfile to adopt
> >> jemalloc
> >>> as default memory allocator [4].
> >>>  From my point of view, we have two choices:
> >>>
> >>>1.  Introduce another Dockerfile using jemalloc as default memory
> >>> allocator, which means Flink needs another two new image tags to build
> >>> docker with jemalloc while default docker still use glibc.
> >>>2.  Set the default memory allocator as jemalloc in our existing
> >>> Dockerfiles, which means Flink offer docker image with jemalloc by
> >> default.
> >>> I prefer the 2nd option as our company already use jemalloc as default
> >>> memory allocator for JDK at our production environment due to messages
> >> from
> >>> os team warning of glibc's memory fragmentation.
> >>> Moreover, I found several open source projects adopting jemalloc as
> >>> default memory allocator within their images to resolve memory
> >>> fragmentation problem, e.g fluent [5], home-assistant [6].
> >>>
> >>> What do you guys think of this issue?
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-18712
> >>> [2]
> >>>
> >>
> https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc
> >>> [3] https://sourceware.org/bugzilla/show_bug.cgi?id=15321
> >>> [4] https://issues.apache.org/jira/browse/FLINK-19125
> >>> [5]
> >>>
> >>
> https://docs.fluentbit.io/manual/v/1.0/installation/docker#why-there-is-no-fluent-bit-docker-image-based-on-alpine-linux
> >>> [6] https://github.com/home-assistant/core/pull/33237
> >>>
> >>>
> >>> Best
> >>> Yun Tang
> >>>
>
>


[jira] [Created] (FLINK-19684) The Jdbc-connector's 'lookup.max-retries' option implementation is different from the meaning

2020-10-16 Thread CaoZhen (Jira)
CaoZhen created FLINK-19684:
---

 Summary: The Jdbc-connector's  'lookup.max-retries' option 
implementation is different from the meaning
 Key: FLINK-19684
 URL: https://issues.apache.org/jira/browse/FLINK-19684
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: CaoZhen


 

The code of 'lookup.max-retries' option :
{code:java}
for (int retry = 1; retry <= maxRetryTimes; retry++) {
  statement.clearParameters();
  .
}  
{code}
>From the code, If this option is set to 0, the JDBC query will not be executed.

 

>From documents,  the max retry times if lookup database failed. [1]

When set to 0, there is a query, but no retry.

 

So,the code of 'lookup.max-retries' option should be:
{code:java}
for (int retry = 0; retry <= maxRetryTimes; retry++) {
  statement.clearParameters();
  .
}  
{code}
 

 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-max-retries



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: [VOTE] Add Translation Specification for Stateful Functions

2020-10-16 Thread Congxian Qiu
FYI, I've added the Specification for Stateful Functions to the existing
wiki[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications

Best,
Congxian


Congxian Qiu  于2020年10月15日周四 下午8:33写道:

> Hi all
> Thanks everyone for the voting.
>
> The voting time for "Add Translation Specification for Stateful
> Functions" has passed, I'm closing the vote now.
>
> There were 7 votes, 4 of which are binding:
>- Yu Li (binding)
>- Jark Wu (binding)
>- Xintong Song (binding)
>- Smile
>- Dian Fu (binding)
>- Hailong Wang
>- Shawn Huang
>
> There were no -1 votes.
>
> Thus, changes have been accepted. I'll update the wiki accordingly.
>
> Best,
> Congxian
>
>
> Shawn Huang  于2020年10月13日周二 下午3:19写道:
>
>> +1
>>
>> Best,
>> Shawn Huang
>>
>>
>> hailongwang <18868816...@163.com> 于2020年10月12日周一 下午11:21写道:
>>
>> > +1
>> > Best,
>> > Hailong Wang
>> > At 2020-10-12 17:00:34, "Xintong Song"  wrote:
>> > >+1
>> > >
>> > >Thank you~
>> > >
>> > >Xintong Song
>> > >
>> > >
>> > >
>> > >On Mon, Oct 12, 2020 at 5:59 PM Jark Wu  wrote:
>> > >
>> > >> +1
>> > >>
>> > >> On Mon, 12 Oct 2020 at 17:14, Yu Li  wrote:
>> > >>
>> > >> > +1
>> > >> >
>> > >> > Best Regards,
>> > >> > Yu
>> > >> >
>> > >> >
>> > >> > On Mon, 12 Oct 2020 at 14:41, Congxian Qiu > >
>> > >> wrote:
>> > >> >
>> > >> > > I would like to start a voting thread for adding translation
>> > >> > specification
>> > >> > > for Stateful Functions, which we’ve reached consensus in [1].
>> > >> > >
>> > >> > >
>> > >> > > This voting will be open for a minimum 3 days till 3:00 pm UTC,
>> Oct
>> > 15.
>> > >> > >
>> > >> > >
>> > >> > > [1]
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-Translation-Specification-for-Stateful-Functions-td45531.html
>> > >> > >
>> > >> > >
>> > >> > > Best,
>> > >> > > Congxian
>> > >> > >
>> > >> >
>> > >>
>> >
>>
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-16 Thread Chesnay Schepler
@Seth: Earlier in this discussion it was said that the BucketingSink 
would not be usable in 1.12 .


On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an explanatory
message to avoid confusion.


--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng





Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-16 Thread Seth Wiesman
+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:

> @Arvid Heise I also do not remember exactly what were all the
> problems. The fact that we added some more bulk formats to the
> streaming file sink definitely reduced the non-supported features. In
> addition, the latest discussion I found on the topic was [1] and the
> conclusion of that discussion seems to be to remove it.
>
> Currently, I cannot find any obvious reason why keeping the
> BucketingSink, apart from the fact that we do not have a migration
> plan unfortunately. This is why I posted this to dev@ and user@.
>
> Cheers,
> Kostas
>
> [1]
> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
>
> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >
> > I remember this conversation popping up a few times already and I'm in
> > general a big fan of removing BucketingSink.
> >
> > However, until now there were a few features lacking in StreamingFileSink
> > that are present in BucketingSink and that are being actively used (I
> can't
> > exactly remember them now, but I can look it up if everyone else is also
> > suffering from bad memory). Did we manage to add them in the meantime? If
> > not, then it feels rushed to remove it at this point.
> >
> > On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> wrote:
> >
> > > @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> > > to migrate from the BucketingSink to the StreamingFileSink. It may be
> > > possible but it will require some effort because the logic would be
> > > "read the old state, commit it, and start fresh with the
> > > StreamingFileSink."
> > >
> > > On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> > > wrote:
> > > >
> > > > On 13.10.20 14:01, David Anderson wrote:
> > > > > I thought this was waiting on FLIP-46 -- Graceful Shutdown
> Handling --
> > > and
> > > > > in fact, the StreamingFileSink is mentioned in that FLIP as a
> > > motivating
> > > > > use case.
> > > >
> > > > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.
> Thanks
> > > > for the reminder, we should close FLIP-46 now with an explanatory
> > > > message to avoid confusion.
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > 
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
>


[jira] [Created] (FLINK-19683) Actively timeout aligned checkpoints on the output

2020-10-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19683:
--

 Summary: Actively timeout aligned checkpoints on the output
 Key: FLINK-19683
 URL: https://issues.apache.org/jira/browse/FLINK-19683
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.12.0
Reporter: Piotr Nowojski


After enqueuing aligned checkpoint barrier on the output, we could register a 
timeout to check if it was sent downstream within some threshold. If not, we 
can convert it to unaligned checkpoint.

Note, this will significantly complicate how to execute the actual checkpoint. 
Namely currently the logic inside `AsyncCheckpointRunnable` is executed as soon 
as checkpoint is triggered. With the timeout on the outputs, we can not 
complete the `AsyncCheckpointRunnable` until we know if the timeout happened or 
not. We would need to register some listener/CompletableFuture tracking if all 
of the checkpoint barriers were sent down the stream, and the aligned 
checkpoint can only complete if those futures are completed before the timeout. 
Otherwise, if timeout happens, we would need to convert the aligned checkpoint 
on the outputs to unaligned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19682) Actively timeout checkpoint barriers on the inputs

2020-10-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19682:
--

 Summary: Actively timeout checkpoint barriers on the inputs
 Key: FLINK-19682
 URL: https://issues.apache.org/jira/browse/FLINK-19682
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Piotr Nowojski


After receiving the first checkpoint barrier announcement, we should some kind 
of register a processing time timeout to switch to unaligned checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19681) Passively timeout alignment on the inputs

2020-10-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19681:
--

 Summary: Passively timeout alignment on the inputs
 Key: FLINK-19681
 URL: https://issues.apache.org/jira/browse/FLINK-19681
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.12.0


CheckpointBarrierHandler will be aware of the aligned checkpoint barrier 
announcement messages. Based on that, it can implement basic timeout. When 
arriving aligned checkpoint barrier (or it's announcement) exceeded maximum 
allowed `checkpointStartDelay`, it should switch to unaligned checkpoint.

This, for the time being could be done without registering any timeout action, 
but a passive/static check when processing checkpoint barrier/barrier 
announcement. If `checkpointStartDelay` is below threshold, do nothing. If 
above, switch to unaligned checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19680) Add CheckpointBarrierAnnouncement priority message

2020-10-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19680:
--

 Summary: Add CheckpointBarrierAnnouncement priority message
 Key: FLINK-19680
 URL: https://issues.apache.org/jira/browse/FLINK-19680
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.12.0


When an aligned checkpoint barrier arrives in an input channel, we should 
enqueue at the head of the queue priority event, announcing arrival of the 
checkpoint barrier.

Such announcement event should be propagated to the `CheckpointBarrierHandler`. 
Based on that, `CheckpointBarrierHandler` will be able to implement logic of 
time outing from aligned checkpoint to unaligned checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19679) Deduplicate code between CheckpointBarrierUnaligner and CheckpointBarrierAligner

2020-10-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19679:
--

 Summary: Deduplicate code between CheckpointBarrierUnaligner and 
CheckpointBarrierAligner
 Key: FLINK-19679
 URL: https://issues.apache.org/jira/browse/FLINK-19679
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.12.0


Code between those two should be deduplicated. Especially taking into account 
that we want to support functionality of time outing aligned checkpoints into 
unaligned ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19678) tableEnv.getConfig.setNullCheck(false) seems to break group by

2020-10-16 Thread Ion Alberdi (Jira)
Ion Alberdi created FLINK-19678:
---

 Summary: tableEnv.getConfig.setNullCheck(false) seems to break 
group by
 Key: FLINK-19678
 URL: https://issues.apache.org/jira/browse/FLINK-19678
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.2
Reporter: Ion Alberdi


The program  written at 
[https://gist.github.com/yetanotherion/d007fa113d97411226eaea4f20cd4c2d|https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgist.github.com%2Fyetanotherion%2Fd007fa113d97411226eaea4f20cd4c2d=04%7C01%7Cj.alberdi%40criteo.com%7C08c520289ec4433b7b7a08d871c60180%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C1%7C637384442549853109%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vQGyO%2FqLQ%2FCN%2BVuQPYbmPVfKDbLxVKmWJh%2Fu9L2qTBw%3D=0]

creates unexpected stack traces when the line 
*// triggerBug…*

Is uncommented (some lines of the stack trace are written in the gist).

(It correctly outputs
{code:java}
+-+
|           c |
+-+
|           1 |
|           2 |
+-+
 {code}
else)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19677) TaskManager takes abnormally long time to register with JobManager on Kubernetes

2020-10-16 Thread Weike Dong (Jira)
Weike Dong created FLINK-19677:
--

 Summary: TaskManager takes abnormally long time to register with 
JobManager on Kubernetes
 Key: FLINK-19677
 URL: https://issues.apache.org/jira/browse/FLINK-19677
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Reporter: Weike Dong


During the registration process of TaskManager, JobManager would create a 

_TaskManagerLocation_ instance, which tries to get hostname of the TaskManager 
via reverse DNS lookup.

However, this always fails in Kubernetes environment, because for pods that are 
not exposed by Services, their IPs cannot be resolved to domains by coredns, 
and _InetAddress#getCanonicalHostName()_ would take ~5 seconds to return, 
blocking the whole registration process.

Therefore Flink should provide a configuration parameter to turn off reverse 
DNS lookup. Also, even when hostname is actually needed, this could be done 
lazily to avoid blocking registration of other TaskManagers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image

2020-10-16 Thread Chesnay Schepler
If it is possible to support both allocators in a single image then we 
should definitely go with that option.


On 10/16/2020 12:21 PM, Yun Tang wrote:

Thanks for Yang's suggestion. I think this could be a better choice.
We could install jemalloc and only enable it in LD_PRELOAD when user pass 
specific configuration for docker-entrypoint.sh.
By doing so, we could avoid to create another docker image tags and also offer 
ability to reduce memory fragmentation problem.

Does anyone else have other ideas?

Best
Yun Tang

From: Yang Wang 
Sent: Thursday, October 15, 2020 14:59
To: dev 
Subject: Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for 
debian based Flink docker image

Thanks Yun Tang for starting this discussion.

I think this is very important when deploy Flink with container environment
in production. I just
have quick question. Could we have both memory allocator(e.g. glibc,
jemalloc) in the Flink
official image and enable a specific one by setting ENV?

Best,
Yang

Yu Li  于2020年10月14日周三 下午12:23写道:


Thanks for debugging and resolving the issue and driving the discussion
Yun!

For the given solutions, I prefer option 1 (supply another Dockerfile using
jemalloc as default memory allocator) because of the below reasons:

1. It's hard to say jemalloc is always better than ptmalloc (glibc malloc),
or else glibc should have already adopted it as the default memory
allocator. And as indicated here [1], in some cases jemalloc will
consume as much as twice the memory than glibc

2. All existing Flink docker images use glibc, if we change the default
memory allocator to jemalloc and only supply one series of images, we will
leave those having better performance with glibc no other choices but
staying with old images. In another word, there's a risk of introducing new
problems while fixing an existing one if choosing option-2.

And there is a third option considering the efforts of maintaining more
images if the memory leak issue is not widely observed, that we could
document the steps of building Dockerfile with jemalloc as default
allocator so users could build it when needed, which leaves the burden to
our users so for me it's not the best option.

Best Regards,
Yu

[1] https://stackoverflow.com/a/33993215

On Tue, 13 Oct 2020 at 15:34, Yun Tang  wrote:


Hi all

Users report they meet serious memory leak when submitting jobs
continously in session mode within k8s (please refer to FLINK-18712[1] ),
and I also reproduce this to find this is caused by memory fragmentation

of

glibc [2][3] and provide solutions to fix this:

   *   Quick but not very clean solution to limit the memory pool of

glibc,

limit MALLOC_ARENA_MAX to 2

   *   More general solution by rebuilding the image to install
libjemalloc-dev and add the libjemalloc.so it to LD_PRELOAD

The reporter adopted the 2nd solution to fix this issue eventually. Thus,
I begin to think whether we should change our Dockerfile to adopt

jemalloc

as default memory allocator [4].
 From my point of view, we have two choices:

   1.  Introduce another Dockerfile using jemalloc as default memory
allocator, which means Flink needs another two new image tags to build
docker with jemalloc while default docker still use glibc.
   2.  Set the default memory allocator as jemalloc in our existing
Dockerfiles, which means Flink offer docker image with jemalloc by

default.

I prefer the 2nd option as our company already use jemalloc as default
memory allocator for JDK at our production environment due to messages

from

os team warning of glibc's memory fragmentation.
Moreover, I found several open source projects adopting jemalloc as
default memory allocator within their images to resolve memory
fragmentation problem, e.g fluent [5], home-assistant [6].

What do you guys think of this issue?

[1] https://issues.apache.org/jira/browse/FLINK-18712
[2]


https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc

[3] https://sourceware.org/bugzilla/show_bug.cgi?id=15321
[4] https://issues.apache.org/jira/browse/FLINK-19125
[5]


https://docs.fluentbit.io/manual/v/1.0/installation/docker#why-there-is-no-fluent-bit-docker-image-based-on-alpine-linux

[6] https://github.com/home-assistant/core/pull/33237


Best
Yun Tang





[jira] [Created] (FLINK-19676) java package name error in docs

2020-10-16 Thread leiqiang (Jira)
leiqiang created FLINK-19676:


 Summary: java package name error in docs
 Key: FLINK-19676
 URL: https://issues.apache.org/jira/browse/FLINK-19676
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: leiqiang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image

2020-10-16 Thread Yun Tang
Thanks for Yang's suggestion. I think this could be a better choice.
We could install jemalloc and only enable it in LD_PRELOAD when user pass 
specific configuration for docker-entrypoint.sh.
By doing so, we could avoid to create another docker image tags and also offer 
ability to reduce memory fragmentation problem.

Does anyone else have other ideas?

Best
Yun Tang

From: Yang Wang 
Sent: Thursday, October 15, 2020 14:59
To: dev 
Subject: Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for 
debian based Flink docker image

Thanks Yun Tang for starting this discussion.

I think this is very important when deploy Flink with container environment
in production. I just
have quick question. Could we have both memory allocator(e.g. glibc,
jemalloc) in the Flink
official image and enable a specific one by setting ENV?

Best,
Yang

Yu Li  于2020年10月14日周三 下午12:23写道:

> Thanks for debugging and resolving the issue and driving the discussion
> Yun!
>
> For the given solutions, I prefer option 1 (supply another Dockerfile using
> jemalloc as default memory allocator) because of the below reasons:
>
> 1. It's hard to say jemalloc is always better than ptmalloc (glibc malloc),
> or else glibc should have already adopted it as the default memory
> allocator. And as indicated here [1], in some cases jemalloc will
> consume as much as twice the memory than glibc
>
> 2. All existing Flink docker images use glibc, if we change the default
> memory allocator to jemalloc and only supply one series of images, we will
> leave those having better performance with glibc no other choices but
> staying with old images. In another word, there's a risk of introducing new
> problems while fixing an existing one if choosing option-2.
>
> And there is a third option considering the efforts of maintaining more
> images if the memory leak issue is not widely observed, that we could
> document the steps of building Dockerfile with jemalloc as default
> allocator so users could build it when needed, which leaves the burden to
> our users so for me it's not the best option.
>
> Best Regards,
> Yu
>
> [1] https://stackoverflow.com/a/33993215
>
> On Tue, 13 Oct 2020 at 15:34, Yun Tang  wrote:
>
> > Hi all
> >
> > Users report they meet serious memory leak when submitting jobs
> > continously in session mode within k8s (please refer to FLINK-18712[1] ),
> > and I also reproduce this to find this is caused by memory fragmentation
> of
> > glibc [2][3] and provide solutions to fix this:
> >
> >   *   Quick but not very clean solution to limit the memory pool of
> glibc,
> > limit MALLOC_ARENA_MAX to 2
> >
> >   *   More general solution by rebuilding the image to install
> > libjemalloc-dev and add the libjemalloc.so it to LD_PRELOAD
> >
> > The reporter adopted the 2nd solution to fix this issue eventually. Thus,
> > I begin to think whether we should change our Dockerfile to adopt
> jemalloc
> > as default memory allocator [4].
> > From my point of view, we have two choices:
> >
> >   1.  Introduce another Dockerfile using jemalloc as default memory
> > allocator, which means Flink needs another two new image tags to build
> > docker with jemalloc while default docker still use glibc.
> >   2.  Set the default memory allocator as jemalloc in our existing
> > Dockerfiles, which means Flink offer docker image with jemalloc by
> default.
> >
> > I prefer the 2nd option as our company already use jemalloc as default
> > memory allocator for JDK at our production environment due to messages
> from
> > os team warning of glibc's memory fragmentation.
> > Moreover, I found several open source projects adopting jemalloc as
> > default memory allocator within their images to resolve memory
> > fragmentation problem, e.g fluent [5], home-assistant [6].
> >
> > What do you guys think of this issue?
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18712
> > [2]
> >
> https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc
> > [3] https://sourceware.org/bugzilla/show_bug.cgi?id=15321
> > [4] https://issues.apache.org/jira/browse/FLINK-19125
> > [5]
> >
> https://docs.fluentbit.io/manual/v/1.0/installation/docker#why-there-is-no-fluent-bit-docker-image-based-on-alpine-linux
> > [6] https://github.com/home-assistant/core/pull/33237
> >
> >
> > Best
> > Yun Tang
> >
>


[jira] [Created] (FLINK-19675) The plan of is incorrect when Calc contains WHERE clause, composite fields access and Python UDF at the same time

2020-10-16 Thread Dian Fu (Jira)
Dian Fu created FLINK-19675:
---

 Summary: The plan of is incorrect when Calc contains WHERE clause, 
composite fields access and Python UDF at the same time 
 Key: FLINK-19675
 URL: https://issues.apache.org/jira/browse/FLINK-19675
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.11.0, 1.10.1
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.10.3, 1.11.3


For the following job:
{code}
SELECT a, pyFunc1(b, d._1) FROM MyTable WHERE a + 1 > 0
{code}

The plan is as following:
{code}
FlinkLogicalCalc(select=[a, pyFunc1(b, f0) AS EXPR$1])
+- FlinkLogicalCalc(select=[a, b, d._1 AS f0])
 +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, 
b, c, d])
{code}
It's incorrect as the where condition is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19674) Translate "Docker" of "Clusters & Depolyment" page into Chinese

2020-10-16 Thread Shubin Ruan (Jira)
Shubin Ruan created FLINK-19674:
---

 Summary: Translate "Docker" of "Clusters & Depolyment" page into 
Chinese
 Key: FLINK-19674
 URL: https://issues.apache.org/jira/browse/FLINK-19674
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Reporter: Shubin Ruan


The page url is 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/docker.html]

The markdown file is located in {{flink/docs/ops/deployment/docker.zh.md}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19673) Translate "Standalone Cluster" of "Clusters & Depolyment" page into Chinese

2020-10-16 Thread Xiao Huang (Jira)
Xiao Huang created FLINK-19673:
--

 Summary: Translate "Standalone Cluster" of "Clusters & Depolyment" 
page into Chinese
 Key: FLINK-19673
 URL: https://issues.apache.org/jira/browse/FLINK-19673
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Reporter: Xiao Huang


The page url is 
[https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/cluster_setup.html]

The markdown file is located in 
{{flink/docs/ops/deployment/cluster_setup.zh.md}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19672) Merge kafka-connector-base into flink-connector-kafka

2020-10-16 Thread Timo Walther (Jira)
Timo Walther created FLINK-19672:


 Summary: Merge kafka-connector-base into flink-connector-kafka
 Key: FLINK-19672
 URL: https://issues.apache.org/jira/browse/FLINK-19672
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Timo Walther
Assignee: Timo Walther


Nowadays, we only offer one unified Kafka connector, so a base module is not 
required anymore. The base module also uses Kafka 0.10 at the moment. We should 
merge those two modules into one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19671) Update EditorConfig file to be useful

2020-10-16 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-19671:


 Summary: Update EditorConfig file to be useful
 Key: FLINK-19671
 URL: https://issues.apache.org/jira/browse/FLINK-19671
 Project: Flink
  Issue Type: Improvement
Reporter: Aljoscha Krettek


We should update our {{.editorconfig}} file to format Java code according to a 
style that passes our checkstyle rules and also applies our import ordering. 
This will greatly simplify development because developers can just "re-format 
code" in IntelliJ and be done with it.

See the ML discussion for more background: 
https://lists.apache.org/thread.html/r481bb622410718cd454d251b194c7c1ad358e5d861fdacc9a4be3b5b%40%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-16 Thread Till Rohrmann
Thanks for sharing the preliminary numbers with us Yingjie. The numbers
look quite impressive :-)

Cheers,
Till

On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for your reply and comments.
>
> You are right, the proposed sort-merge based shuffle is an extension of the
> existing blocking shuffle and does not change any default behavior of
> Flink.
>
> As for the performance, according to our previous experience, sort-merge
> based implementation can reduce the shuffle time by 30% to even 90%
> compared to hash-based implementation. My PoC implementation without any
> further optimization can already reduce the shuffle time over 10% on SSD
> and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>
> After switch to sort-merge based blocking shuffle, some of our users' jobs
> can scale up to over 2 parallelism, though need some JM and RM side
> optimization. I haven't ever tried to find where the upper bound is, but I
> guess sever tens of thousand should be able to m
> <
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> >eet
> the needs of most users.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2020年10月15日周四 下午3:57写道:
>
> > Hi Yingjie,
> >
> > thanks for proposing the sort-merge based blocking shuffle. I like the
> > proposal and it does not seem to change the internals of Flink. Instead
> it
> > is an extension of existing interfaces which makes it a
> > non-invasive addition.
> >
> > Do you have any numbers comparing the performance of the sort-merge based
> > shuffle against the hash-based shuffle? To what parallelism can you scale
> > up when using the sort-merge based shuffle?
> >
> > Cheers,
> > Till
> >
> > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao 
> > wrote:
> >
> > > Hi devs,
> > >
> > > Currently, Flink adopts a hash-style blocking shuffle implementation
> > which
> > > writes data sent to different reducer tasks into separate files
> > > concurrently. Compared to sort-merge based approach writes those data
> > > together into a single file and merges those small files into bigger
> > ones,
> > > hash-based approach has several weak points when it comes to running
> > large
> > > scale batch jobs:
> > >
> > >1. *Stability*: For high parallelism (tens of thousands) batch job,
> > >current hash-based blocking shuffle implementation writes too many
> > files
> > >concurrently which gives high pressure to the file system, for
> > example,
> > >maintenance of too many file metas, exhaustion of inodes or file
> > >descriptors. All of these can be potential stability issues.
> > Sort-Merge
> > >based blocking shuffle don’t have the problem because for one result
> > >partition, only one file is written at the same time.
> > >2. *Performance*: Large amounts of small shuffle files and random IO
> > can
> > >influence shuffle performance a lot especially for hdd (for ssd,
> > > sequential
> > >read is also important because of read ahead and cache). For batch
> > jobs
> > >processing massive data, small amount of data per subpartition is
> > common
> > >because of high parallelism. Besides, data skew is another cause of
> > > small
> > >subpartition files. By merging data of all subpartitions together in
> > one
> > >file, more sequential read can be achieved.
> > >3. *Resource*: For current hash-based implementation, each
> > subpartition
> > >needs at least one buffer. For large scale batch shuffles, the
> memory
> > >consumption can be huge. For example, we need at least 320M network
> > > memory
> > >per result partition if parallelism is set to 1 and because of
> the
> > > huge
> > >network consumption, it is hard to config the network memory for
> large
> > >scale batch job and  sometimes parallelism can not be increased just
> > >because of insufficient network memory  which leads to bad user
> > > experience.
> > >
> > > To improve Flink’s capability of running large scale batch jobs, we
> would
> > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > > feedback is appreciated.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> > >
> > > Best,
> > > Yingjie
> > >
> >
>


[jira] [Created] (FLINK-19670) Create view error

2020-10-16 Thread simenliuxing (Jira)
simenliuxing created FLINK-19670:


 Summary: Create view error
 Key: FLINK-19670
 URL: https://issues.apache.org/jira/browse/FLINK-19670
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.1
Reporter: simenliuxing
 Fix For: 1.11.2


When I run a sql task with flink1.11.1 and blink planner, the following syntax 
error appears, sql is as follows:
{code:java}
CREATE TABLE orders (
  order_id INT,
  product_id INT,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
);

create table products(
  pro_id  INT ,
  product_name STRING,
  PRIMARY  KEY (pro_id) NOT ENFORCED
) WITH  (
  'connector'='jdbc',
  'url'='jdbc:mysql://localhost:3306/test',
  'username'='root',
  'password'='root',
  'table-name'='result4'
);

CREATE TABLE orders_info (
  order_id INT,
  pro_id INT,
  product_name STRING
) WITH (
'connector' = 'print'
);

create view orders_view
as
SELECT
order_id,
pro_id,
product_name
FROM orders
LEFT JOIN products FOR SYSTEM_TIME AS OF orders.proctime
ON orders.product_id = products.pro_id;

INSERT INTO orders_info SELECT * FROM orders_view;
{code}
The error is as follows:
{code:java}
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "FOR" 
at line 3, column 73.
Was expecting one of:
 
"EXCEPT" ...
"FETCH" ...
"GROUP" ...
"HAVING" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ON" ...
"ORDER" ...
"MINUS" ...
"TABLESAMPLE" ...
"UNION" ...
"USING" ...
"WHERE" ...
"WINDOW" ...
"(" ...
"NATURAL" ...
"JOIN" ...
"INNER" ...
"LEFT" ...
"RIGHT" ...
"FULL" ...
"CROSS" ...
"," ...
"OUTER" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3801)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
at 
org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
... 25 more
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-146: Improve new TableSource and TableSink interfaces

2020-10-16 Thread Leonard Xu
+1

Best,
Leonard

> 在 2020年10月16日,11:01,Jark Wu  写道:
> 
> +1
> 
> On Fri, 16 Oct 2020 at 10:27, admin <17626017...@163.com> wrote:
> 
>> +1
>> 
>>> 2020年10月16日 上午10:05,Danny Chan  写道:
>>> 
>>> +1, nice job !
>>> 
>>> Best,
>>> Danny Chan
>>> 在 2020年10月15日 +0800 PM8:08,Jingsong Li ,写道:
 Hi all,
 
 I would like to start the vote for FLIP-146 [1], which is discussed and
 reached consensus in the discussion thread [2]. The vote will be open
>> until
 20th Oct. (72h, exclude weekends), unless there is an objection or not
 enough votes.
 
 [1]
 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
 
 [2]
 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-146-Improve-new-TableSource-and-TableSink-interfaces-td45161.html
 
 Best,
 Jingsong Lee
>> 
>>