Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-08 Thread Jing Zhang
Hi Timo,
Thanks a lot for driving this discussion.
I believe it could solve many problems what we are suffering in upgrading.

I only have a little complain on the following point.

> For simplification of the design, we assume that upgrades use a step size
of a single minor version. We don't guarantee skipping minor versions (e.g.
1.11 to
1.14).

In our internal production environment, we follow up with the community's
latest stable release version almost once a year because upgrading to a new
version is a time-consuming process.
So we might missed 1~3 version after we upgrade to the latest version. This
might also appears in other company too.
Could we guarantee FLIP-190 work if we skip minor versions less than
specified threshold?
Then we could know which version is good for us when prepare upgrading.

Best,
Jing Zhang

godfrey he  于2021年12月8日周三 22:16写道:

> Hi Timo,
>
> Thanks for the explanation, it's much clearer now.
>
> One thing I want to confirm about `supportedPlanFormat `
> and `supportedSavepointFormat `:
> `supportedPlanFormat ` supports multiple versions,
> while `supportedSavepointFormat ` supports only one version ?
> A json plan  can be deserialized by multiple versions
> because default value will be set for new fields.
> In theory, a Savepoint can be restored by more than one version
> of the operators even if a state layout is changed,
> such as deleting a whole state and starting job with
> `allowNonRestoredState`=true.
> I think this is a corner case, and it's hard to understand comparing
> to `supportedPlanFormat ` supporting multiple versions.
> So, for most cases, when the state layout is changed, the savepoint is
> incompatible,
> and `supportedSavepointFormat` and version need to be changed.
>
> I think we need a detail explanation about the annotations change story in
> the java doc of  `ExecNodeMetadata` class for all developers
> (esp. those unfamiliar with this part).
>
> Best,
> Godfrey
>
> Timo Walther  于2021年12月8日周三 下午4:57写道:
> >
> > Hi Wenlong,
> >
> > thanks for the feedback. Great that we reached consensus here. I will
> > update the entire document with my previous example shortly.
> >
> >  > if we don't update the version when plan format changes, we can't
> > find that the plan can't not be deserialized in 1.15
> >
> > This should not be a problem as the entire plan file has a version as
> > well. We should not allow reading a 1.16 plan in 1.15. We can throw a
> > helpful exception early.
> >
> > Reading a 1.15 plan in 1.16 is possible until we drop the old
> > `supportedPlanFormat` from one of used ExecNodes. Afterwards all
> > `supportedPlanFormat` of ExecNodes must be equal or higher then the plan
> > version.
> >
> > Regards,
> > Timo
> >
> > On 08.12.21 03:07, wenlong.lwl wrote:
> > > Hi, Timo,  +1 for multi metadata.
> > >
> > > The compatible change I mean in the last email is the slight state
> change
> > > example you gave, so we have got  consensus on this actually, IMO.
> > >
> > > Another question based on the example you gave:
> > > In the example "JSON node gets an additional property in 1.16", if we
> don't
> > > update the version when plan format changes, we can't find that the
> plan
> > > can't not be deserialized in 1.15, although the savepoint state is
> > > compatible.
> > > The error message may be not so friendly if we just throw
> deserialization
> > > failure.
> > >
> > > On Tue, 7 Dec 2021 at 16:49, Timo Walther  wrote:
> > >
> > >> Hi Wenlong,
> > >>
> > >>   > First,  we add a newStateLayout because of some improvement in
> state, in
> > >>   > order to keep compatibility we may still keep the old state for
> the
> > >> first
> > >>   > version. We need to update the version, so that we can generate a
> new
> > >>   > version plan for the new job and keep the exec node compatible
> with
> > >> the old
> > >>   > version plan.
> > >>
> > >> The problem that I see here for contributors is that the actual update
> > >> of a version is more complicated than just updating an integer value.
> It
> > >> means copying a lot of ExecNode code for a change that happens locally
> > >> in an operator. Let's assume multiple ExecNodes use a similar
> operator.
> > >> Why do we need to update all ExecNode versions, if the operator itself
> > >> can deal with the incompatibility. The ExecNode version is meant for
> > >> topology changes or fundamental state changes.
> > >>
> > >> If we don't find consensus on this topic, I would at least vote for
> > >> supporting multiple annotations for an ExecNode class. This way we
> don't
> > >> need to copy code but only add two ExecNode annotations with different
> > >> ExecNode versions.
> > >>
> > >>   > Maybe we can add support for this case :
> > >>   > when an exec node is changed in 1.16, but is compatible with 1.15,
> > >>   > we can use the node of 1.16 to deserialize the plan of 1.15.
> > >>
> > >> If the ExecNode is compatible, there is no reason to increase the
> > >> ExecNode version.
> > >>
> > >>
> > 

Re: [DISCUSS] Releasing Flink 1.14.1

2021-12-08 Thread Caizhi Weng
Hi devs!

Sorry for the interruptions, but I just found an issue [1] (which I think
is a blocking one) in every Flink version, including Flink 1.14.1.

For Flink < 1.15, this issue will cause incorrect result when user cast two
strings to numerics and compare the numerics.

I'm planning for a quick fix today or tomorrow.

[1] https://issues.apache.org/jira/browse/FLINK-25227

Zhu Zhu  于2021年12月9日周四 10:48写道:

> update: backport of FLINK-19142 is done
>
> Thanks,
> Zhu
>
> Zhu Zhu  于2021年12月8日周三 19:35写道:
>
> > Hi Martijn,
> >
> > I'd like to backport the fix of FLINK-19142 to 1.14.1.
> > The backport is in progress.
> > Will update it here when it is done.
> >
> > Thanks,
> > Zhu
> >
> > Jingsong Li  于2021年12月8日周三 10:33写道:
> >
> >> Hi Martijn,
> >>
> >> We just created a cherry-pick pull-request for
> >> https://issues.apache.org/jira/browse/FLINK-20370
> >> We could finish it as soon as possible.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Fri, Dec 3, 2021 at 10:25 PM Fabian Paul  wrote:
> >> >
> >> > I just opened a PR for
> >> > https://issues.apache.org/jira/browse/FLINK-25126 I'll expect to
> merge
> >> > it sometime next week.
> >> >
> >> > Best,
> >> > Fabian
> >> >
> >> > On Fri, Dec 3, 2021 at 10:49 AM Martijn Visser  >
> >> wrote:
> >> > >
> >> > > Hi all,
> >> > >
> >> > > Just a status update on the open blockers for 1.14.1:
> >> > > * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
> >> constraint is lost with multiple sources join in SQL -> I believe most
> >> review comments have been fixed and it's just the final review remarks
> >> before it's ready.
> >> > > * https://issues.apache.org/jira/browse/FLINK-23946 - Application
> >> mode fails fatally when being shut down -> @David Morávek can you
> provide
> >> an update?
> >> > > * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
> >> leak with ThreadLocals on the JM when submitting a job through the REST
> API
> >> -> I think this is just pending on a merge to master and then creating a
> >> backport?
> >> > > * https://issues.apache.org/jira/browse/FLINK-25126 - Kafka
> >> connector tries to commit aborted transaction in batch mode -> This is a
> >> new blocker. @fp...@apache.org can you give an update?
> >> > > * https://issues.apache.org/jira/browse/FLINK-25132 - KafkaSource
> >> cannot work with object-reusing DeserializationSchema -> There's a PR
> >> that's being reviewed and then needs a backport.
> >> > >
> >> > > It would be great if we can finish all these blockers next week to
> >> start a release. Do the assignees think that's realistic?
> >> > >
> >> > > Best regards,
> >> > >
> >> > > Martijn
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, 2 Dec 2021 at 14:25, Marios Trivyzas 
> >> wrote:
> >> > >>
> >> > >>  https://issues.apache.org/jira/browse/FLINK-22113 will be merged
> >> today (most probably)
> >> > >>
> >> > >> On Mon, Nov 29, 2021 at 10:15 AM Martijn Visser <
> >> mart...@ververica.com> wrote:
> >> > >>>
> >> > >>> Thanks all for the updates! To summarize, these are open tickets
> >> that are considered blockers for Flink 1.14.1:
> >> > >>>
> >> > >>> * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
> >> constraint is lost with multiple sources join in SQL -> @Marios Trivyzas
> >> can you give an estimate when you expect this to be resolved?
> >> > >>> * https://issues.apache.org/jira/browse/FLINK-23946 - Application
> >> mode fails fatally when being shut down -> A patch is being prepared.
> >> @David Morávek do you have an estimate when this patch will be there?
> >> > >>> * https://issues.apache.org/jira/browse/FLINK-24596 - Bugs in
> >> sink.buffer-flush before upsert-kafka -> @fp...@apache.org has provided
> >> a PR is there, so I suspect it would take a couple of days before this
> is
> >> merged.
> >> > >>> * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
> >> leak with ThreadLocals on the JM when submitting a job through the REST
> API
> >> -> @Chesnay Schepler has provided a PR, so I suspect it would also just
> >> take a couple of days before this is merged.
> >> > >>>
> >> > >>> Is there anyone who can help me with creating the actual release
> >> when these tickets are resolved
> >> > >>>
> >> > >>> Best regards,
> >> > >>>
> >> > >>> Martijn
> >> > >>>
> >> > >>>
> >> > >>> On Fri, 26 Nov 2021 at 12:08, Chesnay Schepler <
> ches...@apache.org>
> >> wrote:
> >> > 
> >> >  FLINK-25022: I will open a PR later today, and it should be easy
> to
> >> >  backport.
> >> >  FLINK-25027: Unlikely to make it for 1.14.1; I also wouldn't
> >> consider it
> >> >  a blocker
> >> > 
> >> >  On 24/11/2021 19:40, Martijn Visser wrote:
> >> >  > Hi all,
> >> >  >
> >> >  > I would like to start a discussion on releasing Flink 1.14.1.
> >> Flink 1.14
> >> >  > was released on the 29th of September [1] and so far 107 issues
> >> have been
> >> >  > resolved, including multiple blockers and critical priorities
> >> 

[jira] [Created] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false

2021-12-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-25227:
---

 Summary: Comparing the equality of the same (boxed) numeric values 
returns false
 Key: FLINK-25227
 URL: https://issues.apache.org/jira/browse/FLINK-25227
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Caizhi Weng
 Fix For: 1.15.0, 1.14.1, 1.13.4


Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.

{code:scala}
@Test
def myTest(): Unit = {
  val data = Seq(
Row.of(
  java.lang.Integer.valueOf(1000),
  java.lang.Integer.valueOf(2000),
  java.lang.Integer.valueOf(1000),
  java.lang.Integer.valueOf(2000))
  )

  tEnv.executeSql(
s"""
   |create table T (
   |  a int,
   |  b int,
   |  c int,
   |  d int
   |) with (
   |  'connector' = 'values',
   |  'bounded' = 'true',
   |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
   |)
   |""".stripMargin)

  tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
}
{code}

The result is false, which is obviously incorrect.

This is caused by the generated java code:
{code:java}
public class StreamExecCalc$8 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {

private final Object[] references;
org.apache.flink.table.data.BoxedWrapperRowData out =
new org.apache.flink.table.data.BoxedWrapperRowData(1);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
outElement =
new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

public StreamExecCalc$8(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService)
throws Exception {
this.references = references;

this.setup(task, config, output);
if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
this)
.setProcessingTimeService(processingTimeService);
}
}

@Override
public void open() throws Exception {
super.open();
}

@Override
public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element)
throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();

int field$0;
boolean isNull$0;
int field$1;
boolean isNull$1;
int field$3;
boolean isNull$3;
int field$4;
boolean isNull$4;
boolean isNull$6;
boolean result$7;

isNull$3 = in1.isNullAt(2);
field$3 = -1;
if (!isNull$3) {
field$3 = in1.getInt(2);
}
isNull$0 = in1.isNullAt(0);
field$0 = -1;
if (!isNull$0) {
field$0 = in1.getInt(0);
}
isNull$1 = in1.isNullAt(1);
field$1 = -1;
if (!isNull$1) {
field$1 = in1.getInt(1);
}
isNull$4 = in1.isNullAt(3);
field$4 = -1;
if (!isNull$4) {
field$4 = in1.getInt(3);
}

out.setRowKind(in1.getRowKind());

java.lang.Integer result$2 = field$0;
boolean nullTerm$2 = false;

if (!nullTerm$2) {
java.lang.Integer cur$2 = field$0;
if (isNull$0) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
result$2 = cur$2;
}
}
}

if (!nullTerm$2) {
java.lang.Integer cur$2 = field$1;
if (isNull$1) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
result$2 = cur$2;
}
}
}

if (nullTerm$2) {
result$2 = null;
}

java.lang.Integer result$5 = field$3;
boolean nullTerm$5 = false;

if (!nullTerm$5) {
java.lang.Integer cur$5 = field$3;
if (isNull$3) {
nullTerm$5 = true;
} else {
int compareResult = result$5.compareTo(cur$5);
if 

[jira] [Created] (FLINK-25226) Add documentation about the AdaptiveBatchScheduler

2021-12-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25226:
---

 Summary: Add documentation about the AdaptiveBatchScheduler
 Key: FLINK-25226
 URL: https://issues.apache.org/jira/browse/FLINK-25226
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Zhu Zhu
 Fix For: 1.15.0


Documentation is needed to explain to users how to enable the 
AdaptiveBatchScheduler and properly configuring it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25225) Add e2e TPCDS tests to run against the AdatpiveBatchScheduler

2021-12-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25225:
---

 Summary: Add e2e TPCDS tests to run against the 
AdatpiveBatchScheduler
 Key: FLINK-25225
 URL: https://issues.apache.org/jira/browse/FLINK-25225
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.15.0


To automatically and continuously verify the AdatpiveBatchScheduler, we should 
add a new e2e test which runs TPCDS against the AdatpiveBatchScheduler.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Releasing Flink 1.14.1

2021-12-08 Thread Zhu Zhu
update: backport of FLINK-19142 is done

Thanks,
Zhu

Zhu Zhu  于2021年12月8日周三 19:35写道:

> Hi Martijn,
>
> I'd like to backport the fix of FLINK-19142 to 1.14.1.
> The backport is in progress.
> Will update it here when it is done.
>
> Thanks,
> Zhu
>
> Jingsong Li  于2021年12月8日周三 10:33写道:
>
>> Hi Martijn,
>>
>> We just created a cherry-pick pull-request for
>> https://issues.apache.org/jira/browse/FLINK-20370
>> We could finish it as soon as possible.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Dec 3, 2021 at 10:25 PM Fabian Paul  wrote:
>> >
>> > I just opened a PR for
>> > https://issues.apache.org/jira/browse/FLINK-25126 I'll expect to merge
>> > it sometime next week.
>> >
>> > Best,
>> > Fabian
>> >
>> > On Fri, Dec 3, 2021 at 10:49 AM Martijn Visser 
>> wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Just a status update on the open blockers for 1.14.1:
>> > > * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
>> constraint is lost with multiple sources join in SQL -> I believe most
>> review comments have been fixed and it's just the final review remarks
>> before it's ready.
>> > > * https://issues.apache.org/jira/browse/FLINK-23946 - Application
>> mode fails fatally when being shut down -> @David Morávek can you provide
>> an update?
>> > > * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
>> leak with ThreadLocals on the JM when submitting a job through the REST API
>> -> I think this is just pending on a merge to master and then creating a
>> backport?
>> > > * https://issues.apache.org/jira/browse/FLINK-25126 - Kafka
>> connector tries to commit aborted transaction in batch mode -> This is a
>> new blocker. @fp...@apache.org can you give an update?
>> > > * https://issues.apache.org/jira/browse/FLINK-25132 - KafkaSource
>> cannot work with object-reusing DeserializationSchema -> There's a PR
>> that's being reviewed and then needs a backport.
>> > >
>> > > It would be great if we can finish all these blockers next week to
>> start a release. Do the assignees think that's realistic?
>> > >
>> > > Best regards,
>> > >
>> > > Martijn
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, 2 Dec 2021 at 14:25, Marios Trivyzas 
>> wrote:
>> > >>
>> > >>  https://issues.apache.org/jira/browse/FLINK-22113 will be merged
>> today (most probably)
>> > >>
>> > >> On Mon, Nov 29, 2021 at 10:15 AM Martijn Visser <
>> mart...@ververica.com> wrote:
>> > >>>
>> > >>> Thanks all for the updates! To summarize, these are open tickets
>> that are considered blockers for Flink 1.14.1:
>> > >>>
>> > >>> * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
>> constraint is lost with multiple sources join in SQL -> @Marios Trivyzas
>> can you give an estimate when you expect this to be resolved?
>> > >>> * https://issues.apache.org/jira/browse/FLINK-23946 - Application
>> mode fails fatally when being shut down -> A patch is being prepared.
>> @David Morávek do you have an estimate when this patch will be there?
>> > >>> * https://issues.apache.org/jira/browse/FLINK-24596 - Bugs in
>> sink.buffer-flush before upsert-kafka -> @fp...@apache.org has provided
>> a PR is there, so I suspect it would take a couple of days before this is
>> merged.
>> > >>> * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
>> leak with ThreadLocals on the JM when submitting a job through the REST API
>> -> @Chesnay Schepler has provided a PR, so I suspect it would also just
>> take a couple of days before this is merged.
>> > >>>
>> > >>> Is there anyone who can help me with creating the actual release
>> when these tickets are resolved
>> > >>>
>> > >>> Best regards,
>> > >>>
>> > >>> Martijn
>> > >>>
>> > >>>
>> > >>> On Fri, 26 Nov 2021 at 12:08, Chesnay Schepler 
>> wrote:
>> > 
>> >  FLINK-25022: I will open a PR later today, and it should be easy to
>> >  backport.
>> >  FLINK-25027: Unlikely to make it for 1.14.1; I also wouldn't
>> consider it
>> >  a blocker
>> > 
>> >  On 24/11/2021 19:40, Martijn Visser wrote:
>> >  > Hi all,
>> >  >
>> >  > I would like to start a discussion on releasing Flink 1.14.1.
>> Flink 1.14
>> >  > was released on the 29th of September [1] and so far 107 issues
>> have been
>> >  > resolved, including multiple blockers and critical priorities
>> [2].
>> >  >
>> >  > There are currently 169 open tickets which contain a fixVersion
>> for 1.14.1
>> >  > [3]. I'm including the ones that are currently marked as
>> critical or a
>> >  > blocker to verify if these should be included in Flink 1.14.1.
>> It would be
>> >  > great if those that are assigned or working on one or more of
>> these tickets
>> >  > can give an update on its status.
>> >  >
>> >  > * https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper
>> connection
>> >  > issue causes inconsistent state in Flink -> I think this depends
>> on the
>> >  > outcome of dropping Zookeeper 3.4 as was proposed on the Dev
>> mailing 

[jira] [Created] (FLINK-25224) Bump the hadoop version up

2021-12-08 Thread Jira
刘方奇 created FLINK-25224:
---

 Summary: Bump the hadoop version up
 Key: FLINK-25224
 URL: https://issues.apache.org/jira/browse/FLINK-25224
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: 刘方奇


Now the Hadoop version of Flink is 2.4.1, but mostly the flink runtime env 
classpath has higher Hadoop version. It means when we want to use some feature 
in the higher version Hadoop, we need to use reflection which is so hard to 
maintain.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS][FLINK-24427] Hide Scala from table planner

2021-12-08 Thread Francesco Guardiani
Hi all,
In case you haven't seen, last week I published in the issue comments this
document to explain how we're proceeding to hide Scala from table planner:
https://docs.google.com/document/d/12yDUCnvcwU2mODBKTHQ1xhfOq1ujYUrXltiN_rbhT34/edit?usp=sharing

There is a section I've added yesterday which is particularly relevant,
because it explains the impact on the distribution. I strongly encourage
people to look at it.

Once we perform all the changes, I'm gonna announce them on the user
mailing list as well, together with the package name changes already
brought in by #17897  to
flink-parquet and flink-orc.

Thanks,
FG

-- 

Francesco Guardiani | Software Engineer

france...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: [DISCUSS] Strong read-after-write consistency of Flink FileSystems

2021-12-08 Thread David Morávek
Hi Martijn,

I simply wasn't aware of that one :) It seems to be provided the guarantees
that we need [1].

> Of course, Azure Storage is built on a platform grounded in strong
> consistency guaranteeing that writes are made durable before acknowledging
> success to the client. This is critically important for big data workloads
> where the output from one task is often the input to the next job. This
> greatly simplifies development of big data applications since they do not
> have to work around issues that surface with weaker consistency models such
> as eventual consistency.
>

I'm not able to find the guarantees for MapR FS, but since it has been
designed as an HDFS replacement back in the days, I'd except it provides
the same guarantees as MapReduce heavily relies on this. I've seen that
you've already started a deprecation thread.

[1]
https://azure.microsoft.com/en-us/blog/a-closer-look-at-azure-data-lake-storage-gen2/

D.

On Wed, Dec 8, 2021 at 4:34 PM Martijn Visser  wrote:

> Hi David,
>
> Just to be sure, since you've already included Azure Blob Storage, but did
> you deliberately skip Azure Data Lake Store Gen2? That's currently
> supported and also used by Flink users [1]. There's also MapR FS, but I
> doubt if that is still used.
>
> Best regards,
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/overview/
>
> On Mon, 6 Dec 2021 at 12:28, David Morávek  wrote:
>
>> Hi Everyone,
>>
>> as outlined in FLIP-194 discussion [1], for the future directions of
>> Flink HA services, I'd like to verify my thoughts around guarantees of the
>> distributed filesystems used with Flink.
>>
>> Currently some of the services (*JobGraphStore*,
>> *CompletedCheckpointStore*) are implemented using a combination of
>> strongly consistent Metadata storage (ZooKeeper, K8s CM) and the actual
>> FileSystem. Reasoning behind this dates back to days, when S3 was an
>> eventually consistent FileSystem and we needed a strongly consistent view
>> of the data.
>>
>> I did some research, and my feeling is that all the major FileSystems
>> that Flink supports already provide strong read-after-write consistency,
>> which would be sufficient to decrease a complexity of the current HA
>> implementations.
>>
>> FileSystems that I've checked and that seem to support strong
>> read-after-write consistency:
>> - S3
>> - GCS
>> - Azure Blob Storage
>> - Aliyun OSS
>> - HDFS
>> - Minio
>>
>> Are you aware of other FileSystems that are used with Flink? Do they
>> support the consistency that is required for starting a new initiatives
>> towards simpler / less error-prone HA services? Are you aware of any
>> problems with the above mentioned FileSystems that I might have missed?
>>
>> I'm also bringing this up to user@f.a.o, to make sure we don't miss any
>> FileSystems.
>>
>> [1] https://lists.apache.org/thread/wlzv02jqtq221kb8dnm82v4xj8tomd94
>>
>> Best,
>> D.
>>
>


Re: [VOTE] Deprecate Java 8 support

2021-12-08 Thread Thomas Weise
+1 (binding)

On Wed, Dec 8, 2021 at 2:20 AM Till Rohrmann  wrote:
>
> +1 (binding)
>
> Cheers,
> Till
>
> On Tue, Dec 7, 2021 at 12:35 PM Matthias Pohl 
> wrote:
>
> > Thanks for pushing this Chesnay!
> > +1 (binding)
> >
> > On Mon, Dec 6, 2021 at 9:44 PM Martijn Visser 
> > wrote:
> >
> > > +1 (non-binding)
> > >
> > > Op ma 6 dec. 2021 om 19:58 schreef Ingo Bürk 
> > >
> > > > Before more people let me know, let me update my vote to
> > > >
> > > > +1 (binding)
> > > >
> > > >
> > > > (In all seriousness, thanks for the reminders!)
> > > >
> > > > On Mon, Dec 6, 2021, 16:54 Ingo Bürk  wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > >
> > > > > Ingo
> > > > >
> > > > > On Mon, Dec 6, 2021 at 4:44 PM Chesnay Schepler 
> > > > > wrote:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >> after recent discussions on the dev
> > > > >> 
> > > and
> > > > >> user <
> > > https://lists.apache.org/thread/2jnj5xrokphxhokpo5w3p6w7z0pkl9gx>
> > > > >> mailing list to deprecate Java 8 support, with a general consensus
> > in
> > > > >> favor of it, I would now like tod o a formal vote.
> > > > >>
> > > > >> The deprecation would entail a notification to our users to
> > encourage
> > > > >> migrating to Java 11, and various efforts on our side to prepare a
> > > > >> migration to Java 11, like updating some e2e tests to actually run
> > on
> > > > >> Java 11, performance benchmarking etc. .
> > > > >>
> > > > >> There is no set date for the removal of Java 8 support.
> > > > >>
> > > > >> We'll use the usual minimum 72h vote duration, with committers
> > having
> > > > >> binding votes.
> > > > >>
> > > > >>
> > > >
> > > --
> > >
> > > Martijn Visser | Product Manager
> > >
> > > mart...@ververica.com
> > >
> > > 
> > >
> > >
> > > Follow us @VervericaData
> > >
> > > --
> > >
> > > Join Flink Forward  - The Apache Flink
> > > Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> >


[DISCUSS] Deprecate MapR FS

2021-12-08 Thread Martijn Visser
Hi all,

Flink supports multiple file systems [1] which includes MapR FS. MapR as a
company doesn't exist anymore since 2019, the technology and intellectual
property has been sold to Hewlett Packard.

I don't think that there's anyone who's using MapR anymore and therefore I
think it would be good to deprecate this for Flink 1.15 and then remove it
in Flink 1.16. Removing this from Flink will slightly shrink the codebase
and CI runtime.

I'm also cross posting this to the User mailing list, in case there's still
anyone who's using MapR.

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/


[jira] [Created] (FLINK-25223) ElasticsearchWriterITCase fails on AZP

2021-12-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25223:
-

 Summary: ElasticsearchWriterITCase fails on AZP
 Key: FLINK-25223
 URL: https://issues.apache.org/jira/browse/FLINK-25223
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The {{ElasticsearchWriterITCase}} fails on AZP because

{code}
2021-12-08T13:56:59.5449851Z Dec 08 13:56:59 [ERROR] 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase  Time 
elapsed: 171.046 s  <<< ERROR!
2021-12-08T13:56:59.5450680Z Dec 08 13:56:59 
org.testcontainers.containers.ContainerLaunchException: Container startup failed
2021-12-08T13:56:59.5451652Z Dec 08 13:56:59at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
2021-12-08T13:56:59.5452677Z Dec 08 13:56:59at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
2021-12-08T13:56:59.5453637Z Dec 08 13:56:59at 
org.testcontainers.junit.jupiter.TestcontainersExtension$StoreAdapter.start(TestcontainersExtension.java:242)
2021-12-08T13:56:59.5454757Z Dec 08 13:56:59at 
org.testcontainers.junit.jupiter.TestcontainersExtension$StoreAdapter.access$200(TestcontainersExtension.java:229)
2021-12-08T13:56:59.5455946Z Dec 08 13:56:59at 
org.testcontainers.junit.jupiter.TestcontainersExtension.lambda$null$1(TestcontainersExtension.java:59)
2021-12-08T13:56:59.5457322Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore.lambda$getOrComputeIfAbsent$4(ExtensionValuesStore.java:86)
2021-12-08T13:56:59.5458571Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore$MemoizingSupplier.computeValue(ExtensionValuesStore.java:223)
2021-12-08T13:56:59.5459771Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore$MemoizingSupplier.get(ExtensionValuesStore.java:211)
2021-12-08T13:56:59.5460693Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore$StoredValue.evaluate(ExtensionValuesStore.java:191)
2021-12-08T13:56:59.5461437Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore$StoredValue.access$100(ExtensionValuesStore.java:171)
2021-12-08T13:56:59.5462198Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.ExtensionValuesStore.getOrComputeIfAbsent(ExtensionValuesStore.java:89)
2021-12-08T13:56:59.5467999Z Dec 08 13:56:59at 
org.junit.jupiter.engine.execution.NamespaceAwareStore.getOrComputeIfAbsent(NamespaceAwareStore.java:53)
2021-12-08T13:56:59.5468791Z Dec 08 13:56:59at 
org.testcontainers.junit.jupiter.TestcontainersExtension.lambda$beforeAll$2(TestcontainersExtension.java:59)
2021-12-08T13:56:59.5469436Z Dec 08 13:56:59at 
java.util.ArrayList.forEach(ArrayList.java:1259)
2021-12-08T13:56:59.5470058Z Dec 08 13:56:59at 
org.testcontainers.junit.jupiter.TestcontainersExtension.beforeAll(TestcontainersExtension.java:59)
2021-12-08T13:56:59.5470846Z Dec 08 13:56:59at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllCallbacks$10(ClassBasedTestDescriptor.java:381)
2021-12-08T13:56:59.5471641Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2021-12-08T13:56:59.5472403Z Dec 08 13:56:59at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllCallbacks(ClassBasedTestDescriptor.java:381)
2021-12-08T13:56:59.5473190Z Dec 08 13:56:59at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:205)
2021-12-08T13:56:59.5474001Z Dec 08 13:56:59at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:80)
2021-12-08T13:56:59.5474759Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
2021-12-08T13:56:59.5475833Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2021-12-08T13:56:59.5476739Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
2021-12-08T13:56:59.5477520Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
2021-12-08T13:56:59.5478227Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
2021-12-08T13:56:59.5479190Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2021-12-08T13:56:59.5479936Z Dec 08 13:56:59at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)

Re: [DISCUSS] Strong read-after-write consistency of Flink FileSystems

2021-12-08 Thread Martijn Visser
Hi David,

Just to be sure, since you've already included Azure Blob Storage, but did
you deliberately skip Azure Data Lake Store Gen2? That's currently
supported and also used by Flink users [1]. There's also MapR FS, but I
doubt if that is still used.

Best regards,

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/overview/

On Mon, 6 Dec 2021 at 12:28, David Morávek  wrote:

> Hi Everyone,
>
> as outlined in FLIP-194 discussion [1], for the future directions of Flink
> HA services, I'd like to verify my thoughts around guarantees of the
> distributed filesystems used with Flink.
>
> Currently some of the services (*JobGraphStore*,
> *CompletedCheckpointStore*) are implemented using a combination of
> strongly consistent Metadata storage (ZooKeeper, K8s CM) and the actual
> FileSystem. Reasoning behind this dates back to days, when S3 was an
> eventually consistent FileSystem and we needed a strongly consistent view
> of the data.
>
> I did some research, and my feeling is that all the major FileSystems that
> Flink supports already provide strong read-after-write consistency, which
> would be sufficient to decrease a complexity of the current HA
> implementations.
>
> FileSystems that I've checked and that seem to support strong
> read-after-write consistency:
> - S3
> - GCS
> - Azure Blob Storage
> - Aliyun OSS
> - HDFS
> - Minio
>
> Are you aware of other FileSystems that are used with Flink? Do they
> support the consistency that is required for starting a new initiatives
> towards simpler / less error-prone HA services? Are you aware of any
> problems with the above mentioned FileSystems that I might have missed?
>
> I'm also bringing this up to user@f.a.o, to make sure we don't miss any
> FileSystems.
>
> [1] https://lists.apache.org/thread/wlzv02jqtq221kb8dnm82v4xj8tomd94
>
> Best,
> D.
>


FileSink in Apache Flink not generating logs in output folder

2021-12-08 Thread Dhingra, Kajal
I am new to Flink and doing a POC on it and using it to read data from kafka 
topic and to store it in files on server. I am using FileSink to store files, 
it creates the directory structure date and time wise but no logs files are 
getting created.

When i run the program it creates directory structure as below but log files 
are not getting stored here.


/flink/testlogs/2021-12-08--07

/flink/testlogs/2021-12-08--06

I want the log files should be written every 15 mins to a new log file. Below 
is the code.


DataStream  kafkaTopicData = env.addSource(new 
FlinkKafkaConsumer("MyTopic",new SimpleStringSchema(),p));



OutputFileConfig config = OutputFileConfig

 .builder()

 .withPartPrefix("prefix")

 .withPartSuffix(".ext")

 .build();



DataStream > 
newStream=kafkaTopicData.map(new LogParser());



final FileSink> sink = 
FileSink.forRowFormat(new Path("/flink/testlogs"),

  new SimpleStringEncoder < Tuple6 < String,String,String 
,String, String ,Integer >> ("UTF-8"))

.withRollingPolicy(DefaultRollingPolicy.builder()

.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))

.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))

.withMaxPartSize(1024 * 1024 * 1024)

.build())

.withOutputFileConfig(config)

.build();



newStream.sinkTo(sink);



env.execute("DataReader");



LogParser returns Tuple6.





Regards,




Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk

2021-12-08 Thread godfrey he
Congratulations, Ingo!

Best,
Godfrey

Roman Khachatryan  于2021年12月6日周一 下午6:07写道:
>
> Congratulations, Ingo!
>
> Regards,
> Roman
>
>
> On Mon, Dec 6, 2021 at 11:05 AM Yang Wang  wrote:
> >
> > Congratulations, Ingo!
> >
> > Best,
> > Yang
> >
> > Sergey Nuyanzin  于2021年12月6日周一 下午3:35写道:
> >
> > > Congratulations, Ingo!
> > >
> > > On Mon, Dec 6, 2021 at 7:32 AM Leonard Xu  wrote:
> > >
> > > > Congratulations, Ingo! Well Deserved.
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > > > 2021年12月3日 下午11:24,Ingo Bürk  写道:
> > > > >
> > > > > Thank you everyone for the warm welcome!
> > > > >
> > > > >
> > > > > Best
> > > > > Ingo
> > > > >
> > > > > On Fri, Dec 3, 2021 at 11:47 AM Ryan Skraba
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > >> Congratulations Ingo!
> > > > >>
> > > > >> On Fri, Dec 3, 2021 at 8:17 AM Yun Tang  wrote:
> > > > >>
> > > > >>> Congratulations, Ingo!
> > > > >>>
> > > > >>> Best
> > > > >>> Yun Tang
> > > > >>> 
> > > > >>> From: Yuepeng Pan 
> > > > >>> Sent: Friday, December 3, 2021 14:14
> > > > >>> To: dev@flink.apache.org 
> > > > >>> Cc: Ingo Bürk 
> > > > >>> Subject: Re:Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Congratulations, Ingo!
> > > > >>>
> > > > >>>
> > > > >>> Best,
> > > > >>> Yuepeng Pan
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> At 2021-12-03 13:47:38, "Yun Gao" 
> > > > wrote:
> > > >  Congratulations Ingo!
> > > > 
> > > >  Best,
> > > >  Yun
> > > > 
> > > > 
> > > >  --
> > > >  From:刘建刚 
> > > >  Send Time:2021 Dec. 3 (Fri.) 11:52
> > > >  To:dev 
> > > >  Cc:"Ingo Bürk" 
> > > >  Subject:Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk
> > > > 
> > > >  Congratulations!
> > > > 
> > > >  Best,
> > > >  Liu Jiangang
> > > > 
> > > >  Till Rohrmann  于2021年12月2日周四 下午11:24写道:
> > > > 
> > > > > Hi everyone,
> > > > >
> > > > > On behalf of the PMC, I'm very happy to announce Ingo Bürk as a 
> > > > > new
> > > > >>> Flink
> > > > > committer.
> > > > >
> > > > > Ingo has started contributing to Flink since the beginning of this
> > > > >>> year. He
> > > > > worked mostly on SQL components. He has authored many PRs and
> > > helped
> > > > >>> review
> > > > > a lot of other PRs in this area. He actively reported issues and
> > > > >> helped
> > > > >>> our
> > > > > users on the MLs. His most notable contributions were Support SQL
> > > > 2016
> > > > >>> JSON
> > > > > functions in Flink SQL (FLIP-90), Register sources/sinks in Table
> > > API
> > > > > (FLIP-129) and various other contributions in the SQL area.
> > > Moreover,
> > > > >>> he is
> > > > > one of the few people in our community who actually understands
> > > > >> Flink's
> > > > > frontend.
> > > > >
> > > > > Please join me in congratulating Ingo for becoming a Flink
> > > committer!
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> > > --
> > > Best regards,
> > > Sergey
> > >


Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl

2021-12-08 Thread godfrey he
Congratulations, Matthias!

Best,
Godfrey

Roman Khachatryan  于2021年12月6日周一 下午6:07写道:
>
> Congratulations, Matthias!
>
> Regards,
> Roman
>
>
> On Mon, Dec 6, 2021 at 11:04 AM Yang Wang  wrote:
> >
> > Congratulations, Matthias!
> >
> > Best,
> > Yang
> >
> > Sergey Nuyanzin  于2021年12月6日周一 下午3:35写道:
> >
> > > Congratulations, Matthias!
> > >
> > > On Mon, Dec 6, 2021 at 7:33 AM Leonard Xu  wrote:
> > >
> > > > Congratulations Matthias!
> > > >
> > > > Best,
> > > > Leonard
> > > > > 2021年12月3日 下午11:23,Matthias Pohl  写道:
> > > > >
> > > > > Thank you! I'm looking forward to continue working with you.
> > > > >
> > > > > On Fri, Dec 3, 2021 at 7:29 AM Jingsong Li 
> > > > wrote:
> > > > >
> > > > >> Congratulations, Matthias!
> > > > >>
> > > > >> On Fri, Dec 3, 2021 at 2:13 PM Yuepeng Pan  wrote:
> > > > >>>
> > > > >>> Congratulations Matthias!
> > > > >>>
> > > > >>> Best,Yuepeng Pan.
> > > > >>> 在 2021-12-03 13:47:20,"Yun Gao"  写道:
> > > >  Congratulations Matthias!
> > > > 
> > > >  Best,
> > > >  Yun
> > > > 
> > > > 
> > > >  --
> > > >  From:Jing Zhang 
> > > >  Send Time:2021 Dec. 3 (Fri.) 13:45
> > > >  To:dev 
> > > >  Cc:Matthias Pohl 
> > > >  Subject:Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl
> > > > 
> > > >  Congratulations, Matthias!
> > > > 
> > > >  刘建刚  于2021年12月3日周五 11:51写道:
> > > > 
> > > > > Congratulations!
> > > > >
> > > > > Best,
> > > > > Liu Jiangang
> > > > >
> > > > > Till Rohrmann  于2021年12月2日周四 下午11:28写道:
> > > > >
> > > > >> Hi everyone,
> > > > >>
> > > > >> On behalf of the PMC, I'm very happy to announce Matthias Pohl as
> > > a
> > > > >> new
> > > > >> Flink committer.
> > > > >>
> > > > >> Matthias has worked on Flink since August last year. He helped
> > > > >> review a
> > > > > ton
> > > > >> of PRs. He worked on a variety of things but most notably the
> > > > >> tracking
> > > > > and
> > > > >> reporting of concurrent exceptions, fixing HA bugs and 
> > > > >> deprecating
> > > > >> and
> > > > >> removing our Mesos support. He actively reports issues helping
> > > > >> Flink to
> > > > >> improve and he is actively engaged in Flink's MLs.
> > > > >>
> > > > >> Please join me in congratulating Matthias for becoming a Flink
> > > > >> committer!
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Best, Jingsong Lee
> > > >
> > > >
> > >
> > > --
> > > Best regards,
> > > Sergey
> > >


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-08 Thread godfrey he
Hi Timo,

Thanks for the explanation, it's much clearer now.

One thing I want to confirm about `supportedPlanFormat `
and `supportedSavepointFormat `:
`supportedPlanFormat ` supports multiple versions,
while `supportedSavepointFormat ` supports only one version ?
A json plan  can be deserialized by multiple versions
because default value will be set for new fields.
In theory, a Savepoint can be restored by more than one version
of the operators even if a state layout is changed,
such as deleting a whole state and starting job with
`allowNonRestoredState`=true.
I think this is a corner case, and it's hard to understand comparing
to `supportedPlanFormat ` supporting multiple versions.
So, for most cases, when the state layout is changed, the savepoint is
incompatible,
and `supportedSavepointFormat` and version need to be changed.

I think we need a detail explanation about the annotations change story in
the java doc of  `ExecNodeMetadata` class for all developers
(esp. those unfamiliar with this part).

Best,
Godfrey

Timo Walther  于2021年12月8日周三 下午4:57写道:
>
> Hi Wenlong,
>
> thanks for the feedback. Great that we reached consensus here. I will
> update the entire document with my previous example shortly.
>
>  > if we don't update the version when plan format changes, we can't
> find that the plan can't not be deserialized in 1.15
>
> This should not be a problem as the entire plan file has a version as
> well. We should not allow reading a 1.16 plan in 1.15. We can throw a
> helpful exception early.
>
> Reading a 1.15 plan in 1.16 is possible until we drop the old
> `supportedPlanFormat` from one of used ExecNodes. Afterwards all
> `supportedPlanFormat` of ExecNodes must be equal or higher then the plan
> version.
>
> Regards,
> Timo
>
> On 08.12.21 03:07, wenlong.lwl wrote:
> > Hi, Timo,  +1 for multi metadata.
> >
> > The compatible change I mean in the last email is the slight state change
> > example you gave, so we have got  consensus on this actually, IMO.
> >
> > Another question based on the example you gave:
> > In the example "JSON node gets an additional property in 1.16", if we don't
> > update the version when plan format changes, we can't find that the plan
> > can't not be deserialized in 1.15, although the savepoint state is
> > compatible.
> > The error message may be not so friendly if we just throw deserialization
> > failure.
> >
> > On Tue, 7 Dec 2021 at 16:49, Timo Walther  wrote:
> >
> >> Hi Wenlong,
> >>
> >>   > First,  we add a newStateLayout because of some improvement in state, 
> >> in
> >>   > order to keep compatibility we may still keep the old state for the
> >> first
> >>   > version. We need to update the version, so that we can generate a new
> >>   > version plan for the new job and keep the exec node compatible with
> >> the old
> >>   > version plan.
> >>
> >> The problem that I see here for contributors is that the actual update
> >> of a version is more complicated than just updating an integer value. It
> >> means copying a lot of ExecNode code for a change that happens locally
> >> in an operator. Let's assume multiple ExecNodes use a similar operator.
> >> Why do we need to update all ExecNode versions, if the operator itself
> >> can deal with the incompatibility. The ExecNode version is meant for
> >> topology changes or fundamental state changes.
> >>
> >> If we don't find consensus on this topic, I would at least vote for
> >> supporting multiple annotations for an ExecNode class. This way we don't
> >> need to copy code but only add two ExecNode annotations with different
> >> ExecNode versions.
> >>
> >>   > Maybe we can add support for this case :
> >>   > when an exec node is changed in 1.16, but is compatible with 1.15,
> >>   > we can use the node of 1.16 to deserialize the plan of 1.15.
> >>
> >> If the ExecNode is compatible, there is no reason to increase the
> >> ExecNode version.
> >>
> >>
> >>
> >> I tried to come up with a reworked solution to make all parties happy:
> >>
> >> 1. Let's assume the following annotations:
> >>
> >> supportedPlanFormat = [1.15]
> >>
> >> supportedSavepointFormat = 1.15
> >>
> >> we drop `added` as it is equal to `supportedSavepointFormat`
> >>
> >> 2. Multiple annotations over ExecNodes are possible:
> >>
> >> // operator state changes
> >>
> >> // initial introduction in 1.15
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >>
> >> // state layout changed slightly in 1.16
> >> // - operator migration is possible
> >> // - operator supports state of both versions and will perform operator
> >> state migration
> >> // - new plans will get new ExecNode version
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.16)
> >>
> >> // we force a plan migration in 1.17
> >> // - we assume that all operator states have 

[jira] [Created] (FLINK-25222) Remove NetworkFailureProxy used for Kafka connector tests

2021-12-08 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25222:
---

 Summary: Remove NetworkFailureProxy used for Kafka connector tests
 Key: FLINK-25222
 URL: https://issues.apache.org/jira/browse/FLINK-25222
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Test Infrastructure
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


Recently the number of Kafka connector tests either hitting a timeout due to 
blocked networking or corrupted network responses increased significantly. 

We think it is also by our custom network failure implementation since all the 
tests are for the legacy FlinkKafkaProducer or FlinkKafkaConsumer we want 
safely remove them because we will not add more features to this connector, to 
increase the overall stability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25221) Allow global table options for all table connectors

2021-12-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-25221:


 Summary: Allow global table options for all table connectors
 Key: FLINK-25221
 URL: https://issues.apache.org/jira/browse/FLINK-25221
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


Some options (e.g. `sink.parallelism`) should be defined globally for all 
connectors. This would avoid implementing the same option for each connector 
individually.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Releasing Flink 1.14.1

2021-12-08 Thread Zhu Zhu
Hi Martijn,

I'd like to backport the fix of FLINK-19142 to 1.14.1.
The backport is in progress.
Will update it here when it is done.

Thanks,
Zhu

Jingsong Li  于2021年12月8日周三 10:33写道:

> Hi Martijn,
>
> We just created a cherry-pick pull-request for
> https://issues.apache.org/jira/browse/FLINK-20370
> We could finish it as soon as possible.
>
> Best,
> Jingsong
>
> On Fri, Dec 3, 2021 at 10:25 PM Fabian Paul  wrote:
> >
> > I just opened a PR for
> > https://issues.apache.org/jira/browse/FLINK-25126 I'll expect to merge
> > it sometime next week.
> >
> > Best,
> > Fabian
> >
> > On Fri, Dec 3, 2021 at 10:49 AM Martijn Visser 
> wrote:
> > >
> > > Hi all,
> > >
> > > Just a status update on the open blockers for 1.14.1:
> > > * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
> constraint is lost with multiple sources join in SQL -> I believe most
> review comments have been fixed and it's just the final review remarks
> before it's ready.
> > > * https://issues.apache.org/jira/browse/FLINK-23946 - Application
> mode fails fatally when being shut down -> @David Morávek can you provide
> an update?
> > > * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
> leak with ThreadLocals on the JM when submitting a job through the REST API
> -> I think this is just pending on a merge to master and then creating a
> backport?
> > > * https://issues.apache.org/jira/browse/FLINK-25126 - Kafka connector
> tries to commit aborted transaction in batch mode -> This is a new blocker.
> @fp...@apache.org can you give an update?
> > > * https://issues.apache.org/jira/browse/FLINK-25132 - KafkaSource
> cannot work with object-reusing DeserializationSchema -> There's a PR
> that's being reviewed and then needs a backport.
> > >
> > > It would be great if we can finish all these blockers next week to
> start a release. Do the assignees think that's realistic?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > >
> > >
> > >
> > > On Thu, 2 Dec 2021 at 14:25, Marios Trivyzas  wrote:
> > >>
> > >>  https://issues.apache.org/jira/browse/FLINK-22113 will be merged
> today (most probably)
> > >>
> > >> On Mon, Nov 29, 2021 at 10:15 AM Martijn Visser <
> mart...@ververica.com> wrote:
> > >>>
> > >>> Thanks all for the updates! To summarize, these are open tickets
> that are considered blockers for Flink 1.14.1:
> > >>>
> > >>> * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
> constraint is lost with multiple sources join in SQL -> @Marios Trivyzas
> can you give an estimate when you expect this to be resolved?
> > >>> * https://issues.apache.org/jira/browse/FLINK-23946 - Application
> mode fails fatally when being shut down -> A patch is being prepared.
> @David Morávek do you have an estimate when this patch will be there?
> > >>> * https://issues.apache.org/jira/browse/FLINK-24596 - Bugs in
> sink.buffer-flush before upsert-kafka -> @fp...@apache.org has provided a
> PR is there, so I suspect it would take a couple of days before this is
> merged.
> > >>> * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader
> leak with ThreadLocals on the JM when submitting a job through the REST API
> -> @Chesnay Schepler has provided a PR, so I suspect it would also just
> take a couple of days before this is merged.
> > >>>
> > >>> Is there anyone who can help me with creating the actual release
> when these tickets are resolved
> > >>>
> > >>> Best regards,
> > >>>
> > >>> Martijn
> > >>>
> > >>>
> > >>> On Fri, 26 Nov 2021 at 12:08, Chesnay Schepler 
> wrote:
> > 
> >  FLINK-25022: I will open a PR later today, and it should be easy to
> >  backport.
> >  FLINK-25027: Unlikely to make it for 1.14.1; I also wouldn't
> consider it
> >  a blocker
> > 
> >  On 24/11/2021 19:40, Martijn Visser wrote:
> >  > Hi all,
> >  >
> >  > I would like to start a discussion on releasing Flink 1.14.1.
> Flink 1.14
> >  > was released on the 29th of September [1] and so far 107 issues
> have been
> >  > resolved, including multiple blockers and critical priorities [2].
> >  >
> >  > There are currently 169 open tickets which contain a fixVersion
> for 1.14.1
> >  > [3]. I'm including the ones that are currently marked as critical
> or a
> >  > blocker to verify if these should be included in Flink 1.14.1. It
> would be
> >  > great if those that are assigned or working on one or more of
> these tickets
> >  > can give an update on its status.
> >  >
> >  > * https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper
> connection
> >  > issue causes inconsistent state in Flink -> I think this depends
> on the
> >  > outcome of dropping Zookeeper 3.4 as was proposed on the Dev
> mailing list
> >  > * https://issues.apache.org/jira/browse/FLINK-25027 - Allow GC
> of a
> >  > finished job's JobMaster before the slot timeout is reached
> >  > * https://issues.apache.org/jira/browse/FLINK-25022 -
> 

[jira] [Created] (FLINK-25220) Writing an architectural rule for all IT cases w.r.t. MiniCluster

2021-12-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-25220:
---

 Summary: Writing an architectural rule for all IT cases w.r.t. 
MiniCluster
 Key: FLINK-25220
 URL: https://issues.apache.org/jira/browse/FLINK-25220
 Project: Flink
  Issue Type: Improvement
Reporter: Jing Ge


Writing an architectural rule verifies that all IT cases (within 
o.a.f.table.*?) have:
 # a public, static member of type MiniClusterWithClientResource annotated with 
ClassRule.
 #  a public, non-static member of type MiniClusterWithClientResource annotated 
with Rule.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Deprecate Java 8 support

2021-12-08 Thread Till Rohrmann
+1 (binding)

Cheers,
Till

On Tue, Dec 7, 2021 at 12:35 PM Matthias Pohl 
wrote:

> Thanks for pushing this Chesnay!
> +1 (binding)
>
> On Mon, Dec 6, 2021 at 9:44 PM Martijn Visser 
> wrote:
>
> > +1 (non-binding)
> >
> > Op ma 6 dec. 2021 om 19:58 schreef Ingo Bürk 
> >
> > > Before more people let me know, let me update my vote to
> > >
> > > +1 (binding)
> > >
> > >
> > > (In all seriousness, thanks for the reminders!)
> > >
> > > On Mon, Dec 6, 2021, 16:54 Ingo Bürk  wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > >
> > > > Ingo
> > > >
> > > > On Mon, Dec 6, 2021 at 4:44 PM Chesnay Schepler 
> > > > wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> after recent discussions on the dev
> > > >> 
> > and
> > > >> user <
> > https://lists.apache.org/thread/2jnj5xrokphxhokpo5w3p6w7z0pkl9gx>
> > > >> mailing list to deprecate Java 8 support, with a general consensus
> in
> > > >> favor of it, I would now like tod o a formal vote.
> > > >>
> > > >> The deprecation would entail a notification to our users to
> encourage
> > > >> migrating to Java 11, and various efforts on our side to prepare a
> > > >> migration to Java 11, like updating some e2e tests to actually run
> on
> > > >> Java 11, performance benchmarking etc. .
> > > >>
> > > >> There is no set date for the removal of Java 8 support.
> > > >>
> > > >> We'll use the usual minimum 72h vote duration, with committers
> having
> > > >> binding votes.
> > > >>
> > > >>
> > >
> > --
> >
> > Martijn Visser | Product Manager
> >
> > mart...@ververica.com
> >
> > 
> >
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
>


[jira] [Created] (FLINK-25219) load configuration from flink-conf.yaml, some of my parameters were modified

2021-12-08 Thread jackie (Jira)
jackie created FLINK-25219:
--

 Summary: load configuration from flink-conf.yaml, some of my 
parameters were modified
 Key: FLINK-25219
 URL: https://issues.apache.org/jira/browse/FLINK-25219
 Project: Flink
  Issue Type: Improvement
Reporter: jackie


Flink uses the following method to parse flink-conf.yaml.

When there are # characters in my parameters, they will be modified.

for example:

flink-conf.yaml  :  s3.secret-key=abc#123

At the end i will get ‘ s3.secret-key=abc’

Why not use a better way to process flink-conf.yaml? like snakeyaml
{code:java}
String[] comments = line.split("#", 2);
String conf = comments[0].trim();

// 2. get key and value
if (conf.length() > 0) {
String[] kv = conf.split(": ", 2); {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-08 Thread Timo Walther

Hi Wenlong,

thanks for the feedback. Great that we reached consensus here. I will 
update the entire document with my previous example shortly.


> if we don't update the version when plan format changes, we can't 
find that the plan can't not be deserialized in 1.15


This should not be a problem as the entire plan file has a version as 
well. We should not allow reading a 1.16 plan in 1.15. We can throw a 
helpful exception early.


Reading a 1.15 plan in 1.16 is possible until we drop the old 
`supportedPlanFormat` from one of used ExecNodes. Afterwards all 
`supportedPlanFormat` of ExecNodes must be equal or higher then the plan 
version.


Regards,
Timo

On 08.12.21 03:07, wenlong.lwl wrote:

Hi, Timo,  +1 for multi metadata.

The compatible change I mean in the last email is the slight state change
example you gave, so we have got  consensus on this actually, IMO.

Another question based on the example you gave:
In the example "JSON node gets an additional property in 1.16", if we don't
update the version when plan format changes, we can't find that the plan
can't not be deserialized in 1.15, although the savepoint state is
compatible.
The error message may be not so friendly if we just throw deserialization
failure.

On Tue, 7 Dec 2021 at 16:49, Timo Walther  wrote:


Hi Wenlong,

  > First,  we add a newStateLayout because of some improvement in state, in
  > order to keep compatibility we may still keep the old state for the
first
  > version. We need to update the version, so that we can generate a new
  > version plan for the new job and keep the exec node compatible with
the old
  > version plan.

The problem that I see here for contributors is that the actual update
of a version is more complicated than just updating an integer value. It
means copying a lot of ExecNode code for a change that happens locally
in an operator. Let's assume multiple ExecNodes use a similar operator.
Why do we need to update all ExecNode versions, if the operator itself
can deal with the incompatibility. The ExecNode version is meant for
topology changes or fundamental state changes.

If we don't find consensus on this topic, I would at least vote for
supporting multiple annotations for an ExecNode class. This way we don't
need to copy code but only add two ExecNode annotations with different
ExecNode versions.

  > Maybe we can add support for this case :
  > when an exec node is changed in 1.16, but is compatible with 1.15,
  > we can use the node of 1.16 to deserialize the plan of 1.15.

If the ExecNode is compatible, there is no reason to increase the
ExecNode version.



I tried to come up with a reworked solution to make all parties happy:

1. Let's assume the following annotations:

supportedPlanFormat = [1.15]

supportedSavepointFormat = 1.15

we drop `added` as it is equal to `supportedSavepointFormat`

2. Multiple annotations over ExecNodes are possible:

// operator state changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// state layout changed slightly in 1.16
// - operator migration is possible
// - operator supports state of both versions and will perform operator
state migration
// - new plans will get new ExecNode version
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)

// we force a plan migration in 1.17
// - we assume that all operator states have been migrated in the
previous version
// - we can safely replace the old version `1` with `2` and only keep
the new savepoint format
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)


// plan changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// JSON node gets an additional property in 1.16
// e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
// - ExecNode version does not change
// - ExecNode version only changes when topology or state is affected
// - we support both JSON plan formats, the old and the newest one
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16],
supportedSavepointFormat=1.15)

// we force a plan migration in 1.17
// - now we only support 1.16 plan format
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
supportedSavepointFormat=1.15)


// topology change

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// complete new class structure in 1.16 annotated with
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)



What do you think?


Regards,
Timo









On 07.12.21 08:20, wenlong.lwl wrote:

Maybe we can add support for this case :
  when an exec node is changed in 1.16, but is compatible with

1.15,

we can use