Re: [VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Kurt Young
+1, making concepts clear and understandable to all the developers is a
very important thing.
Thanks Leonard for driving this.

Best,
Kurt

On Tue, Aug 25, 2020 at 10:47 AM Rui Li  wrote:

> +1. Thanks Leonard for driving this.
>
> On Tue, Aug 25, 2020 at 10:10 AM Jark Wu  wrote:
>
> > Thanks Leonard!
> >
> > +1 to the FLIP.
> >
> > Best,
> > Jark
> >
> > On Tue, 25 Aug 2020 at 01:41, Fabian Hueske  wrote:
> >
> >> Leonard, Thanks for updating the FLIP!
> >>
> >> +1 to the current version.
> >>
> >> Thanks, Fabian
> >>
> >> Am Mo., 24. Aug. 2020 um 17:56 Uhr schrieb Leonard Xu <
> xbjt...@gmail.com
> >> >:
> >>
> >>> Hi all,
> >>>
> >>> I would like to start the vote for FLIP-132 [1], which has been
> >>> discussed and
> >>> reached a consensus in the discussion thread [2].
> >>>
> >>> The vote will be open until 27th August (72h), unless there is an
> >>> objection or not enough votes.
> >>>
> >>> Best,
> >>> Leonard
> >>> [1]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
> >>>
> >>> [2]
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
> >>>
> >>>
> >>>
>
> --
> Best regards!
> Rui Li
>


[ANNOUNCE] Apache Flink 1.10.2 released

2020-08-24 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/08/25/release-1.10.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re: Flink kubernetes autoscale

2020-08-24 Thread Yang Wang
Add some more additional information.

A. Refer to Yangze's answer.
B. If you are using a native K8s integration and enable the zookeeper HA,
whenever you want to upgrade, stop the Flink cluster
and start a new one with same clusterId. It will recover from the latest
checkpoint.
If you are using the standalone mode, using a operator will help a lot
for the upgrading[1][2]. However, both of them will have
some down time.
C. AFAIK, even though we have the reactive mode in the future, i am afraid
that Flink could not identify which operator is source and
rescale individually. Since we enable the slot sharing by default, only the
max parallelism matters.
D. If you store the state on DFS(e.g. HDFS, S3, GFS, etc.), when the job
restarts, they will be fetched remotely. So large states will
consume more time. But i think you could have a try with the statefulSet
and PV(i.e. persistent volume) to make local recovery could work.



[1]. https://github.com/lyft/flinkk8soperator
[2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator


Best,
Yang

Yangze Guo  于2020年8月24日周一 下午7:55写道:

> Hi, Mazen
>
> AFAIK, we now have two K8s integration, native[1] and standalone[2]. I
> guess the native K8s integration is what you mean by active K8S
> integration.
>
> Regarding the reactive mode, I think it is still working in progress,
> you could refer to [3].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
> [3] https://issues.apache.org/jira/browse/FLINK-10407
>
> Best,
> Yangze Guo
>
> On Mon, Aug 24, 2020 at 6:20 PM Mazen 
> wrote:
> >
> > To the best of my knowledge, for Flink deployment on Kubernetes we have
> two
> > options as of now : (1) active K8S integration with separate job manager
> per
> > job and (2) reactive container mode with auto rescale based on some
> metrics:
> > Could you please give me on the hint on the below:
> >
> > A - Are the two integrations already integrated to Flink recent releases?
> > Any documentation on that?
> >
> > B - In all cases it is necessary to kill and restart the job which is a
> > concern for some critical use cases? Can a rolling upgrade be used to
> have a
> > zero down time while recalling/upgrading?
> >
> > C- In such recasle mechanism, does Kubernetes/Flink identify which stream
> > operator is the source of load/utilization and rescale it individually,
> or
> > the rescaling is done at the granularity of whole job.
> >
> > D- for stateful operators/jobs, how the state repartitioning and
> assignment
> > to new instances is performed? Does this repartitioning/reassignment is
> time
> > consuming especially for large states?
> >
> > Thank you.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Paul Lam
Hi Aljoscha,

I'm lightly leaning towards keeping the 0.10 connector, for Kafka 0.10 still 
has a steady user base in my observation. 

But if we drop 0.10 connector, can we ensure the users would be able to 
smoothly migrate to 0.11 connector/universal connector?

If I remember correctly, the universal connector is compatible with 0.10 
brokers, but I want to double check that.

Best,
Paul Lam

> 2020年8月24日 22:46,Aljoscha Krettek  写道:
> 
> Hi all,
> 
> this thought came up on FLINK-17260 [1] but I think it would be a good idea 
> in general. The issue reminded us that Kafka didn't have an 
> idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have had 
> the "modern" Kafka connector that roughly follows new Kafka releases for a 
> while and this one supports Kafka cluster versions as far back as 0.10.2.0 (I 
> believe).
> 
> What are your thoughts on removing support for older Kafka versions? And yes, 
> I know that we had multiple discussions like this in the past but I'm trying 
> to gauge the current sentiment.
> 
> I'm cross-posting to the user-ml since this is important for both users and 
> developers.
> 
> Best,
> Aljoscha
> 
> [1] https://issues.apache.org/jira/browse/FLINK-17260



Re: [VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Rui Li
+1. Thanks Leonard for driving this.

On Tue, Aug 25, 2020 at 10:10 AM Jark Wu  wrote:

> Thanks Leonard!
>
> +1 to the FLIP.
>
> Best,
> Jark
>
> On Tue, 25 Aug 2020 at 01:41, Fabian Hueske  wrote:
>
>> Leonard, Thanks for updating the FLIP!
>>
>> +1 to the current version.
>>
>> Thanks, Fabian
>>
>> Am Mo., 24. Aug. 2020 um 17:56 Uhr schrieb Leonard Xu > >:
>>
>>> Hi all,
>>>
>>> I would like to start the vote for FLIP-132 [1], which has been
>>> discussed and
>>> reached a consensus in the discussion thread [2].
>>>
>>> The vote will be open until 27th August (72h), unless there is an
>>> objection or not enough votes.
>>>
>>> Best,
>>> Leonard
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
>>>
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
>>>
>>>
>>>

-- 
Best regards!
Rui Li


Re: [VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Jark Wu
Thanks Leonard!

+1 to the FLIP.

Best,
Jark

On Tue, 25 Aug 2020 at 01:41, Fabian Hueske  wrote:

> Leonard, Thanks for updating the FLIP!
>
> +1 to the current version.
>
> Thanks, Fabian
>
> Am Mo., 24. Aug. 2020 um 17:56 Uhr schrieb Leonard Xu :
>
>> Hi all,
>>
>> I would like to start the vote for FLIP-132 [1], which has been discussed
>> and
>> reached a consensus in the discussion thread [2].
>>
>> The vote will be open until 27th August (72h), unless there is an
>> objection or not enough votes.
>>
>> Best,
>> Leonard
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
>>
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
>>
>>
>>


[jira] [Created] (FLINK-19041) Add dependency management for ConnectedStream in Python DataStream API.

2020-08-24 Thread Shuiqiang Chen (Jira)
Shuiqiang Chen created FLINK-19041:
--

 Summary: Add dependency management for ConnectedStream in Python 
DataStream API.
 Key: FLINK-19041
 URL: https://issues.apache.org/jira/browse/FLINK-19041
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Shuiqiang Chen
 Fix For: 1.12.0


There is a bug that we forget to set merged configurations into 
DataStreamTwoInputPythonStatelessFunctionOperator when finally generating the 
StreamGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Fabian Hueske
Leonard, Thanks for updating the FLIP!

+1 to the current version.

Thanks, Fabian

Am Mo., 24. Aug. 2020 um 17:56 Uhr schrieb Leonard Xu :

> Hi all,
>
> I would like to start the vote for FLIP-132 [1], which has been discussed
> and
> reached a consensus in the discussion thread [2].
>
> The vote will be open until 27th August (72h), unless there is an
> objection or not enough votes.
>
> Best,
> Leonard
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
>
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
>
>
>


Re: How jobmanager and task manager communicates with each other ?

2020-08-24 Thread sidhant gupta
++dev@flink.apache.org

On Mon, Aug 24, 2020, 7:31 PM sidhant gupta  wrote:

> Hi User
>
> How jobmanager and task manager communicates with each other ? How to set
> connection between jobmanager and task manager running in different/same
> ec2 instance ? Is it http or tcp ? How the service discovery works ?
>
> Thanks
> Sidhant Gupta
>


[VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Leonard Xu
Hi all,

I would like to start the vote for FLIP-132 [1], which has been discussed and
reached a consensus in the discussion thread [2].

The vote will be open until 27th August (72h), unless there is an objection or 
not enough votes.

Best,
Leonard
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
 

 
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
 

 



[jira] [Created] (FLINK-19040) SourceOperator is not closing SourceReader

2020-08-24 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19040:
--

 Summary: SourceOperator is not closing SourceReader
 Key: FLINK-19040
 URL: https://issues.apache.org/jira/browse/FLINK-19040
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.1, 1.12.0
Reporter: Piotr Nowojski
 Fix For: 1.12.0, 1.11.2


{{SourceOperator}} is creating {{SourceReader}} but {{SourceReader}} is never 
closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-132: Temporal Table DDL

2020-08-24 Thread Leonard Xu
Thanks Fabian, Rui and Jark for the nice discussion!

It seems everyone involved in this discussion has reached a consensus.

I will start another vote thread  later.

Best,
Leonard

> 在 2020年8月24日,20:54,Fabian Hueske  写道:
> 
> Hi everyone,
> 
> Thanks for the good discussion!
> 
> I'm fine keeping the names "event-time temporal join" and "processing-time 
> temporal join".
> Also +1 for Rui's proposal using "versioned table" for versioned dynamic 
> table and "regular table" for regular dynamic table.
> 
> Thanks,
> Fabian
> 
> 
> 
> Am Mo., 24. Aug. 2020 um 04:24 Uhr schrieb Jark Wu  >:
> I think we have to make some compromise here. Either updating the definition 
> of "temporal table", or extending the definition of "temporal join".
> I'm also fine with Rui's proposal that "temporal join" can also work with a 
> regular table.
> 
> Best,
> Jark
> 
> On Fri, 21 Aug 2020 at 23:49, Leonard Xu  > wrote:
> Thanks @Fabian @Jark @Rui for sharing your opinions.
> 
> For the the small divergence about choose a temporal join name or temporal 
> table name,
> I don't have strong inclination.
> 
> Regarding to choose a different name for this join:
> I agree with Jark and Rui to keep the existing  "event-time temporal join" 
> and "processing-time temporal join".
> 
> Regarding to name the "temporal table without versions/ latest-only temporal 
> table":
> Although Flink has imported Temporal Table concept for users[1], I think 
> users are more familiar with dynamic table concept.
> If we wouldn't like to import more temporal table concepts, Rui's proposal 
> using "versioned table" for versioned dynamic table and "regular table" for 
> regular dynamic table is fine to me.
> 
> HDYT? @Fabian  @Jark
> 
> 
> Best
> Leonard 
> [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
>  
> 
>  
> 
>> 在 2020年8月21日,14:18,Rui Li > > 写道:
>> 
>> Hi guys,
>> 
>> Just my two cents.
>> 
>> I agree with Jark that we should use "event-time/processing-time temporal 
>> join" as the name for this join.
>> 
>> But I'm not sure about the definition of "temporal table".
>> 
>> Then the "temporal join" (i.e. FOR SYSTEM_TIME AS OF syntax) joins a 
>> non-temporal table, which also sounds like contradictions.
>> 
>> Why would we require "temporal join" only joins a "temporal table", if the 
>> standard doesn't even define what is a "temporal table"?
>> 
>> What about defining the term "temporal table" to be "a table is changing 
>> over time", the same as the "dynamic table".
>>  
>> I'd prefer not to introduce another term if it has the same meaning as 
>> "dynamic table".
>> 
>> I wonder whether it's possible to consider "temporal join" as a specific 
>> kind of join that may work with regular or versioned tables, instead
>> of a join that requires a specific kind of table.
>> 
>> On Fri, Aug 21, 2020 at 1:45 PM Jark Wu > > wrote:
>> Hi everyone,
>> 
>> Thank you for the great discussion @Leonard and @Fabian.
>> 
>> Regarding to choose a different name for this join:
>> 
>> From my point of view, I don't agree to introduce a new grammar called 
>> whatever "lookup join" or "version join", because:
>> 1. "lookup" is a physical behavior not a logical concept. 
>> 2. The SQL standard has proposed temporal join and it fits Flink well with 
>> the event-time and processing-time attributes. 
>> 3. We already have so many different join grammer, e.g. "regular join", 
>> "interval join", "temporal join", and maybe "window join" in the future.
>> It may confuse users when having more join concepts. 
>> 
>> So I think the existing "event-time temporal join" and "processing-time 
>> temporal join" work well and we should still use them. 
>> 
>> Regarding to the "temporal table without versions":
>> 
>> I agree there are contradictions between "temporal table" and "temporal 
>> table without versions" if we think "temporal table" tracks full history.
>> However, if we call "temporal table without versions" as a "regular table", 
>> not a kind of "temporal table". 
>> Then the "temporal join" (i.e. FOR SYSTEM_TIME AS OF syntax) joins a 
>> non-temporal table, which also sounds like contradictions.
>> 
>> I also notice that in SQL Server, the "system-versioned temporal table" is a 
>> combination of "Temporal Table" (current data) and "History Table" [1]. 
>> The "Temporal Table" is the current data of the database which is the same 
>> as our definition of "temporal table without version". The documentation 
>> says:
>> 
>> > This additional table is referred to as the history table, while the main 
>> > table that stores current (actual) row versions is referred to as the 
>> > current table or simply as the temporal table.
>> 
>> Besides, SQL:2011 doesn't define the meaning of "temporal table" [2], 

[jira] [Created] (FLINK-19039) Parallel Flink Kafka Consumers compete with each other

2020-08-24 Thread Ayrat Hudaygulov (Jira)
Ayrat Hudaygulov created FLINK-19039:


 Summary: Parallel Flink Kafka Consumers compete with each other
 Key: FLINK-19039
 URL: https://issues.apache.org/jira/browse/FLINK-19039
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.11.1
Reporter: Ayrat Hudaygulov


If I'll run multiple Flink instances with same consumer group id they will not 
re-balance partitions with each other, but rather each instance take all 
partitions, effectively not working in parallel at all, and multiplying amount 
of messages processed.

 

This is because FlinkKafkaConsumer has its own re-balancing mechanism for 
current parallelism level and then just calls:

`consumerTmp.assign(newPartitionAssignments){color:#cc7832};{color}`

 

[https://github.com/apache/flink/blob/59714b9d6addb1dbf2171cab937a0e3fec52f2b1/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L422]

 

I suppose there has to be a way to fallback to default kafka mechanism of 
re-balancing to respect consumer group id, but it's not presented in Flink at 
all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Aljoscha Krettek

Hi all,

this thought came up on FLINK-17260 [1] but I think it would be a good 
idea in general. The issue reminded us that Kafka didn't have an 
idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have 
had the "modern" Kafka connector that roughly follows new Kafka releases 
for a while and this one supports Kafka cluster versions as far back as 
0.10.2.0 (I believe).


What are your thoughts on removing support for older Kafka versions? And 
yes, I know that we had multiple discussions like this in the past but 
I'm trying to gauge the current sentiment.


I'm cross-posting to the user-ml since this is important for both users 
and developers.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-17260


Re: [DISCUSS] FLIP-132: Temporal Table DDL

2020-08-24 Thread Fabian Hueske
Hi everyone,

Thanks for the good discussion!

I'm fine keeping the names "event-time temporal join" and "processing-time
temporal join".
Also +1 for Rui's proposal using "versioned table" for versioned dynamic
table and "regular table" for regular dynamic table.

Thanks,
Fabian



Am Mo., 24. Aug. 2020 um 04:24 Uhr schrieb Jark Wu :

> I think we have to make some compromise here. Either updating the
> definition of "temporal table", or extending the definition of "temporal
> join".
> I'm also fine with Rui's proposal that "temporal join" can also work with
> a regular table.
>
> Best,
> Jark
>
> On Fri, 21 Aug 2020 at 23:49, Leonard Xu  wrote:
>
>> Thanks @Fabian @Jark @Rui for sharing your opinions.
>>
>> For the the small divergence about choose a temporal join name or
>> temporal table name,
>> I don't have strong inclination.
>>
>> *Regarding to choose a different name for this join:*
>> I agree with Jark and Rui to keep the existing  "event-time temporal
>> join" and "processing-time temporal join".
>>
>> *Regarding to name the "temporal table without versions/ latest-only
>> temporal table":*
>> Although Flink has imported Temporal Table concept for users[1], I think
>> users are more familiar with dynamic table concept.
>> If we wouldn't like to import more temporal table concepts, Rui's
>> proposal using "versioned table" for versioned dynamic table and "regular
>> table" for regular dynamic table is fine to me.
>>
>> HDYT? @Fabian  @Jark
>>
>>
>> Best
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
>>
>>
>> 在 2020年8月21日,14:18,Rui Li  写道:
>>
>> Hi guys,
>>
>> Just my two cents.
>>
>> I agree with Jark that we should use "event-time/processing-time temporal
>> join" as the name for this join.
>>
>> But I'm not sure about the definition of "temporal table".
>>
>> Then the "temporal join" (i.e. FOR SYSTEM_TIME AS OF syntax) joins a
>>> non-temporal table, which also sounds like contradictions.
>>>
>>
>> Why would we require "temporal join" only joins a "temporal table", if
>> the standard doesn't even define what is a "temporal table"?
>>
>> What about defining the term "temporal table" to be "a table is
>>> changing over time", the same as the "dynamic table".
>>>
>>
>> I'd prefer not to introduce another term if it has the same meaning as
>> "dynamic table".
>>
>> I wonder whether it's possible to consider "temporal join" as a specific
>> kind of join that may work with regular or versioned tables, instead
>> of a join that requires a specific kind of table.
>>
>> On Fri, Aug 21, 2020 at 1:45 PM Jark Wu  wrote:
>>
>>> Hi everyone,
>>>
>>> Thank you for the great discussion @Leonard and @Fabian.
>>>
>>> *Regarding to choose a different name for this join:*
>>>
>>> From my point of view, I don't agree to introduce a new grammar called
>>> whatever "lookup join" or "version join", because:
>>> 1. "lookup" is a physical behavior not a logical concept.
>>> 2. The SQL standard has proposed temporal join and it fits Flink well
>>> with the event-time and processing-time attributes.
>>> 3. We already have so many different join grammer, e.g. "regular join",
>>> "interval join", "temporal join", and maybe "window join" in the future.
>>> It may confuse users when having more join concepts.
>>>
>>> So I think the existing "event-time temporal join" and "processing-time
>>> temporal join" work well and we should still use them.
>>>
>>> *Regarding to the "temporal table without versions":*
>>>
>>> I agree there are contradictions between "temporal table" and "temporal
>>> table without versions" if we think "temporal table" tracks full history.
>>> However, if we call "temporal table without versions" as a "regular
>>> table", not a kind of "temporal table".
>>> Then the "temporal join" (i.e. FOR SYSTEM_TIME AS OF syntax) joins a
>>> non-temporal table, which also sounds like contradictions.
>>>
>>> I also notice that in SQL Server, the "system-versioned temporal table"
>>> is a combination of "Temporal Table" (current data) and "History Table"
>>> [1].
>>> The "Temporal Table" is the current data of the database which is the
>>> same as our definition of "temporal table without version". The
>>> documentation says:
>>>
>>> > *This additional table is referred to as the history table, while the
>>> main table that stores current (actual) row versions is referred to as the
>>> current table or simply as the temporal table. *
>>>
>>> Besides, SQL:2011 doesn't define the meaning of "temporal table" [2], so
>>> I think we can define the meaning ourselves.
>>>
>>> *> Interestingly, SQL:2011 manages to provide this (system-versioned
>>> table) support without actually defining or using the terms “temporal data”
>>> or “temporal table”.*
>>>
>>> What about defining the term "temporal table" to be "a table is
>>> changing over time", the same as the "dynamic table".
>>> A "versioned temporal table" is a special temporal table which tracks
>>> 

Re: [RESULT] [VOTE] Release 1.10.2, release candidate #2

2020-08-24 Thread Zhu Zhu
Hi everyone,

This is an update for release 1.10.2.
The release process is currently pending on the PR[1] to have flink docker
images published on Docker Hub.
After it is merged, we can shortly get Flink 1.10.2 released.

[1] https://github.com/docker-library/official-images/pull/8599

Thanks,
Zhu Zhu

Zhu Zhu  于2020年8月20日周四 下午9:47写道:

> Hi everyone,
>
> I'm happy to announce that we have unanimously approved this release.
>
> There are 6 approving votes, 3 of which are binding:
> * Robert Metzger (binding)
> * Till Rohrmann (binding)
> * Ufuk Celebi (binding)
> * Jeff Zhang
> * Congxian Qiu
> * Yun Tang
>
> There are no disapproving votes.
>
> Thanks everyone!
>
> Thanks,
> Zhu Zhu
>


[jira] [Created] (FLINK-19038) It doesn't support to call Table.fetch() continuously

2020-08-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-19038:
---

 Summary: It doesn't support to call Table.fetch() continuously
 Key: FLINK-19038
 URL: https://issues.apache.org/jira/browse/FLINK-19038
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Dian Fu
 Fix For: 1.12.0


table.fetch(3).fetch(2) will failed with "FETCH is already defined."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread Till Rohrmann
The heap dump did not show anything too suspicious. The only thing I
noticed is that there are 13 ChildFirstClassLoaders whereas there are only
6 Task instances in the heap dump. Are you running all 13 tasks on the same
TaskExecutor?

Cheers,
Till

On Mon, Aug 24, 2020 at 2:01 PM Till Rohrmann  wrote:

> What could also cause the problem is that the metaspace memory budget is
> configured too tightly. Here is a pointer to increasing the metaspace size
> [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_trouble.html#outofmemoryerror-metaspace
>
> Cheers,
> Till
>
> On Mon, Aug 24, 2020 at 1:49 PM Till Rohrmann 
> wrote:
>
>> Hi,
>>
>> could you share with us the Flink cluster logs? This would help answering
>> a lot of questions around your setup and the Flink version you are using.
>> Thanks a lot!
>>
>> Cheers,
>> Till
>>
>> On Mon, Aug 24, 2020 at 10:48 AM 耿延杰  wrote:
>>
>>> Still failed after every 12 tasks.
>>> And the exception stack of failed tasks is different.
>>>
>>>
>>> such as the recent failed tasks's exception info:
>>> Caused by: java.lang.OutOfMemoryError: Metaspace
>>> at java.lang.ClassLoader.defineClass1(Native
>>> Method)
>>> at
>>> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>> at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>> at
>>> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>> at
>>> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>> at
>>> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>> at
>>> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>> at
>>> java.security.AccessController.doPrivileged(Native Method)
>>> at
>>> java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>> at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>>> at
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>> at
>>> org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
>>> at
>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>> at
>>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
>>> at
>>> ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
>>> at
>>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
>>> at
>>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
>>> at
>>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
>>> at
>>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90)
>>> at
>>> ru.yandex.clickhouse.ClickHouseConnectionImpl.initTimeZone(ClickHouseConnectionImpl.java:94)
>>> at
>>> ru.yandex.clickhouse.ClickHouseConnectionImpl.>> at
>>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:55)
>>> at
>>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:47)
>>> at
>>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:29)
>>> at
>>> java.sql.DriverManager.getConnection(DriverManager.java:664)
>>> at
>>> java.sql.DriverManager.getConnection(DriverManager.java:270)
>>> at org.apache.flink.api.java.io
>>> .jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
>>> at
>>> com.xxx.clickhouse.ClickHouseJDBCOutputFormat.open(ClickHouseJDBCOutputFormat.java:53)
>>> at
>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>>
>>> is different with the exception info in last email.
>>>
>>>
>>> So analyse the dump file is the key.
>>>
>>>
>>>
>>>
>>>
>>>
>>> --原始邮件--
>>> 发件人:
>>> "耿延杰"
>>>   <
>>> gyj199...@qq.com;
>>> 发送时间:2020年8月24日(星期一) 下午4:33
>>> 收件人:"dev">>
>>> 主题:回复:OutOfMemoryError: Metaspace on Batch Task When Write into
>>> Clickhouse
>>>
>>>
>>>
>>> Additional info:
>>>
>>>
>>> The exception info in Flink Manager Page:
>>>
>>>
>>> Caused by: java.lang.OutOfMemoryError: Metaspace
>>>  at java.lang.ClassLoader.defineClass1(Native Method)
>>>  at
>>> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>>  at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>  at
>>> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>  at
>>> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>  at
>>> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>  at
>>> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>  at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>  at
>>> 

Re: OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread Till Rohrmann
What could also cause the problem is that the metaspace memory budget is
configured too tightly. Here is a pointer to increasing the metaspace size
[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_trouble.html#outofmemoryerror-metaspace

Cheers,
Till

On Mon, Aug 24, 2020 at 1:49 PM Till Rohrmann  wrote:

> Hi,
>
> could you share with us the Flink cluster logs? This would help answering
> a lot of questions around your setup and the Flink version you are using.
> Thanks a lot!
>
> Cheers,
> Till
>
> On Mon, Aug 24, 2020 at 10:48 AM 耿延杰  wrote:
>
>> Still failed after every 12 tasks.
>> And the exception stack of failed tasks is different.
>>
>>
>> such as the recent failed tasks's exception info:
>> Caused by: java.lang.OutOfMemoryError: Metaspace
>> at java.lang.ClassLoader.defineClass1(Native
>> Method)
>> at
>> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>> at
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>> at
>> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>> at
>> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>> at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>> at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>> at
>> java.security.AccessController.doPrivileged(Native Method)
>> at
>> java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>> at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>> at
>> org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
>> at
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>> at
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
>> at
>> ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
>> at
>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
>> at
>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
>> at
>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
>> at
>> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90)
>> at
>> ru.yandex.clickhouse.ClickHouseConnectionImpl.initTimeZone(ClickHouseConnectionImpl.java:94)
>> at
>> ru.yandex.clickhouse.ClickHouseConnectionImpl.> at
>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:55)
>> at
>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:47)
>> at
>> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:29)
>> at
>> java.sql.DriverManager.getConnection(DriverManager.java:664)
>> at
>> java.sql.DriverManager.getConnection(DriverManager.java:270)
>> at org.apache.flink.api.java.io
>> .jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
>> at
>> com.xxx.clickhouse.ClickHouseJDBCOutputFormat.open(ClickHouseJDBCOutputFormat.java:53)
>> at
>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>> at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>> is different with the exception info in last email.
>>
>>
>> So analyse the dump file is the key.
>>
>>
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "耿延杰"
>> <
>> gyj199...@qq.com;
>> 发送时间:2020年8月24日(星期一) 下午4:33
>> 收件人:"dev">
>> 主题:回复:OutOfMemoryError: Metaspace on Batch Task When Write into
>> Clickhouse
>>
>>
>>
>> Additional info:
>>
>>
>> The exception info in Flink Manager Page:
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Metaspace
>>  at java.lang.ClassLoader.defineClass1(Native Method)
>>  at
>> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>  at
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>  at
>> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>  at
>> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>  at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>  at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>  at java.security.AccessController.doPrivileged(Native
>> Method)
>>  at
>> java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>  at
>> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>>  at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>  at
>> org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
>>  at
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>  at
>> 

Re: Flink kubernetes autoscale

2020-08-24 Thread Yangze Guo
Hi, Mazen

AFAIK, we now have two K8s integration, native[1] and standalone[2]. I
guess the native K8s integration is what you mean by active K8S
integration.

Regarding the reactive mode, I think it is still working in progress,
you could refer to [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
[3] https://issues.apache.org/jira/browse/FLINK-10407

Best,
Yangze Guo

On Mon, Aug 24, 2020 at 6:20 PM Mazen  wrote:
>
> To the best of my knowledge, for Flink deployment on Kubernetes we have two
> options as of now : (1) active K8S integration with separate job manager per
> job and (2) reactive container mode with auto rescale based on some metrics:
> Could you please give me on the hint on the below:
>
> A - Are the two integrations already integrated to Flink recent releases?
> Any documentation on that?
>
> B - In all cases it is necessary to kill and restart the job which is a
> concern for some critical use cases? Can a rolling upgrade be used to have a
> zero down time while recalling/upgrading?
>
> C- In such recasle mechanism, does Kubernetes/Flink identify which stream
> operator is the source of load/utilization and rescale it individually, or
> the rescaling is done at the granularity of whole job.
>
> D- for stateful operators/jobs, how the state repartitioning and assignment
> to new instances is performed? Does this repartitioning/reassignment is time
> consuming especially for large states?
>
> Thank you.
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread Till Rohrmann
Hi,

could you share with us the Flink cluster logs? This would help answering a
lot of questions around your setup and the Flink version you are using.
Thanks a lot!

Cheers,
Till

On Mon, Aug 24, 2020 at 10:48 AM 耿延杰  wrote:

> Still failed after every 12 tasks.
> And the exception stack of failed tasks is different.
>
>
> such as the recent failed tasks's exception info:
> Caused by: java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native
> Method)
> at
> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at
> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at
> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at
> java.security.AccessController.doPrivileged(Native Method)
> at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
> at
> java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at
> org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
> at
> ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
> at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
> at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
> at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
> at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90)
> at
> ru.yandex.clickhouse.ClickHouseConnectionImpl.initTimeZone(ClickHouseConnectionImpl.java:94)
> at
> ru.yandex.clickhouse.ClickHouseConnectionImpl. at
> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:55)
> at
> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:47)
> at
> ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:29)
> at
> java.sql.DriverManager.getConnection(DriverManager.java:664)
> at
> java.sql.DriverManager.getConnection(DriverManager.java:270)
> at org.apache.flink.api.java.io
> .jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
> at
> com.xxx.clickhouse.ClickHouseJDBCOutputFormat.open(ClickHouseJDBCOutputFormat.java:53)
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> is different with the exception info in last email.
>
>
> So analyse the dump file is the key.
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "耿延杰"
> <
> gyj199...@qq.com;
> 发送时间:2020年8月24日(星期一) 下午4:33
> 收件人:"dev"
> 主题:回复:OutOfMemoryError: Metaspace on Batch Task When Write into
> Clickhouse
>
>
>
> Additional info:
>
>
> The exception info in Flink Manager Page:
>
>
> Caused by: java.lang.OutOfMemoryError: Metaspace
>  at java.lang.ClassLoader.defineClass1(Native Method)
>  at
> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>  at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>  at
> java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>  at
> java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>  at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>  at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>  at java.security.AccessController.doPrivileged(Native
> Method)
>  at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>  at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>  at
> org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
>  at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
>  at
> ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
>  at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
>  at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
>  at
> ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
>  at
> 

[jira] [Created] (FLINK-19037) Introduce proper IO executor in Dispatcher

2020-08-24 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-19037:
--

 Summary: Introduce proper IO executor in Dispatcher
 Key: FLINK-19037
 URL: https://issues.apache.org/jira/browse/FLINK-19037
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Robert Metzger


Currently, IO operations in the {{Dispatcher}} are scheduled on the 
{{rpcService.getExecutor()}}.

We should introduce a separate executor for IO operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Flink kubernetes autoscale

2020-08-24 Thread Mazen
To the best of my knowledge, for Flink deployment on Kubernetes we have two
options as of now : (1) active K8S integration with separate job manager per
job and (2) reactive container mode with auto rescale based on some metrics:
Could you please give me on the hint on the below:

A - Are the two integrations already integrated to Flink recent releases?
Any documentation on that?

B - In all cases it is necessary to kill and restart the job which is a
concern for some critical use cases? Can a rolling upgrade be used to have a
zero down time while recalling/upgrading?

C- In such recasle mechanism, does Kubernetes/Flink identify which stream
operator is the source of load/utilization and rescale it individually, or
the rescaling is done at the granularity of whole job.

D- for stateful operators/jobs, how the state repartitioning and assignment
to new instances is performed? Does this repartitioning/reassignment is time
consuming especially for large states?

Thank you.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


[jira] [Created] (FLINK-19036) Translate page 'Application Profiling & Debugging' of 'Debugging & Monitoring' into Chinese

2020-08-24 Thread Roc Marshal (Jira)
Roc Marshal created FLINK-19036:
---

 Summary: Translate page 'Application Profiling & Debugging' of 
'Debugging & Monitoring' into Chinese
 Key: FLINK-19036
 URL: https://issues.apache.org/jira/browse/FLINK-19036
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Affects Versions: 1.11.1, 1.11.0
Reporter: Roc Marshal


The markdown file location: flink/docs/monitoring/application_profiling.zh.md
The page url is 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/application_profiling.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Hi all,

After the discussion in [1], I would like to open a voting thread for
FLIP-134 [2] which discusses the semantics that the DataStream API
will expose when applied on a bounded input.

The vote will be open until 27th August (72h), unless there is an
objection or not enough votes.

Cheers,
Kostas

[1] 
https://lists.apache.org/thread.html/reb368f095ec13638b95cd5d885a0aa8e69af06d6e982a5f045f50022%40%3Cdev.flink.apache.org%3E
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


[jira] [Created] (FLINK-19035) Remove deprecated DataStream#fold() method and all related classes

2020-08-24 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19035:


 Summary: Remove deprecated DataStream#fold() method and all 
related classes
 Key: FLINK-19035
 URL: https://issues.apache.org/jira/browse/FLINK-19035
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Dawid Wysakowicz


We should also agree if we want to remove those in 1.x version already or do we 
want to wait for 2.0.

We should remove DataStream#fold and all related classes and methods such as 
FoldFunction, FoldingState, FoldingStateDescriptor ... 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas  wrote:
>
> Hi Guowei,
>
> Thanks for the insightful comment!
>
> I agree that this can be a limitation of the current runtime, but I
> think that this FLIP can go on as it discusses mainly the semantics
> that the DataStream API will expose when applied on bounded data.
> There will definitely be other FLIPs that will actually handle the
> runtime-related topics.
>
> But it is good to document them nevertheless so that we start soon
> ironing out the remaining rough edges.
>
> Cheers,
> Kostas
>
> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
> >
> > Hi, Klou
> >
> > Thanks for your proposal. It's a very good idea.
> > Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> > AUTOMATIC execution mode maybe we could not pick BATCH execution mode even 
> > if all sources are bounded. For example some applications would use the 
> > `CheckpointListener`, which is not available in the BATCH mode in current 
> > implementation.
> > So maybe we need more checks in the AUTOMATIC execution mode.
> >
> > Best,
> > Guowei
> >
> >
> > On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
> >>
> >> Hi all,
> >>
> >> Thanks for the comments!
> >>
> >> @Dawid: "execution.mode" can be a nice alternative and from a quick
> >> look it is not used currently by any configuration option. I will
> >> update the FLIP accordingly.
> >>
> >> @David: Given that having the option to allow timers to fire at the
> >> end of the job is already in the FLIP, I will leave it as is and I
> >> will update the default policy to be "ignore processing time timers
> >> set by the user". This will allow existing dataStream programs to run
> >> on bounded inputs. This update will affect point 2 in the "Processing
> >> Time Support in Batch" section.
> >>
> >> If these changes cover your proposals, then I would like to start a
> >> voting thread tomorrow evening if this is ok with you.
> >>
> >> Please let me know until then.
> >>
> >> Kostas
> >>
> >> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  
> >> wrote:
> >> >
> >> > Being able to optionally fire registered processing time timers at the 
> >> > end of a job would be interesting, and would help in (at least some of) 
> >> > the cases I have in mind. I don't have a better idea.
> >> >
> >> > David
> >> >
> >> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  
> >> > wrote:
> >> >>
> >> >> Hi Kurt and David,
> >> >>
> >> >> Thanks a lot for the insightful feedback!
> >> >>
> >> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> >> agree with you that it requires a lot more work and careful thinking
> >> >> on the semantics. This FLIP was written under the assumption that if
> >> >> the user wants to have checkpoints on bounded input, he/she will have
> >> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> >> can be handled as a separate topic in the future.
> >> >>
> >> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> >> what are the plans there at the scheduling level, as one could imagine
> >> >> in the future that in mixed workloads, we schedule first all the
> >> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> >> subgraph per application, which is going to be scheduled after all
> >> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> >> towards that direction.
> >> >>
> >> >>
> >> >> @David: The processing time timer handling is a topic that has also
> >> >> been discussed in the community in the past, and I do not remember any
> >> >> final conclusion unfortunately.
> >> >>
> >> >> In the current context and for bounded input, we chose to favor
> >> >> reproducibility of the result, as this is expected in batch processing
> >> >> where the whole input is available in advance. This is why this
> >> >> proposal suggests to not allow processing time timers. But I
> >> >> understand your argument that the user may want to be able to run the
> >> >> same pipeline on batch and streaming this is why we added the two
> >> >> options under future work, namely (from the FLIP):
> >> >>
> >> >> ```
> >> >> Future Work: In the future we may consider adding as options the 
> >> >> capability of:
> >> >> * firing all the registered processing time timers at the end of a job
> >> >> (at close()) or,
> >> >> * ignoring all the registered processing time timers at the end of a 
> >> >> job.
> >> >> ```
> >> >>
> >> >> Conceptually, we are essentially saying that we assume that batch
> >> >> execution is assumed to be instantaneous and refers to a single
> >> >> "point" in time and any 

[jira] [Created] (FLINK-19034) Remove deprecated StreamExecutionEnvironment#set/getNumberOfExecutionRetries

2020-08-24 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19034:


 Summary: Remove deprecated 
StreamExecutionEnvironment#set/getNumberOfExecutionRetries
 Key: FLINK-19034
 URL: https://issues.apache.org/jira/browse/FLINK-19034
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.12.0


Remove deprecated 
{code}
StreamExecutionEnvironment#setNumberOfExecutionRetries/getNumberOfExecutionRetries
{code}

The corresponding settings in {{ExecutionConfig}} will be removed in a separate 
issue, as they are {{Public}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19033) Cleanups of DataStream API

2020-08-24 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19033:


 Summary: Cleanups of DataStream API
 Key: FLINK-19033
 URL: https://issues.apache.org/jira/browse/FLINK-19033
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, API / DataStream
Reporter: Dawid Wysakowicz


An umbrella issue for clean ups in the DataStream API specific classes such as:
* DataStream
* StreamExecutionEnvironment
* ExecutionConfig
* ...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19032) Remove deprecated RuntimeContext#getAllAcumullators

2020-08-24 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19032:


 Summary: Remove deprecated RuntimeContext#getAllAcumullators
 Key: FLINK-19032
 URL: https://issues.apache.org/jira/browse/FLINK-19032
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.12.0


We could  remove the deprecated:
{code}
RuntimeContext#getAllAcumullators
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19031) Remove deprecated setStateBackend(AbstactStateBackend)

2020-08-24 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19031:


 Summary: Remove deprecated setStateBackend(AbstactStateBackend)
 Key: FLINK-19031
 URL: https://issues.apache.org/jira/browse/FLINK-19031
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


We can remove the deprecated mehod:

{code}
StreamExecutionEnvironment#setStateBackend(AbstractStateBackend backend) 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Introduce partitioning strategies to Table/SQL

2020-08-24 Thread Jingsong Li
Hi all,

## Motivation

FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
extent that let us support Hive's partitioning.
But this partition definition is completely specific to Hive/File
systems, with the continuous development of the system, there are new
requirements:

- FLIP-107 [2] requirements: A common requirement is to create a custom
partitioning of the data. We should have a way to specify/compute target
partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the
only way to control partitioning.

- Apache Iceberg partitioning [3] requirements: Iceberg produces partition
values by taking a column value and optionally transforming it. Iceberg is
responsible for converting event_time into event_date, and keeps track of
the relationship.

So I think it is better to introduce partitioning strategies to Flink,
the partitioning strategies is similar to partitioning in traditional
database like Oracle [4].

## Proposed Partitioning DDL

Hash Partitioning Tables:

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  date: DATE ... )
PARTITIONED BY (HASH(id, name))

Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
  name STRING,
  date: DATE ... )
PARTITIONED BY (date)

(Can we remove the brackets when there is only a single layer partition? =>
"PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )

Composite Partitioning Tables:

CREATE TABLE fs_table (
  name STRING,
  date: Date
   ... )
PARTITIONED BY (year(date), month(date), day(date))

Composite Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
  name STRING,
  date: Date,
  y: STRING,'
  m: STRING,
  d: STRING,
   ... )
PARTITIONED BY (y, m, d)

## Rejected Alternatives

Composite Partitioning Tables DDL like Oracle:

CREATE TABLE invoices (
  invoice_noNUMBER NOT NULL,
  invoice_date  DATE   NOT NULL,
  comments  VARCHAR2(500))
PARTITION BY RANGE (invoice_date)
SUBPARTITION BY HASH (invoice_no)
SUBPARTITIONS 8 (
  PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
'DD/MM/')),
  PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
'DD/MM/')),
  PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
'DD/MM/')),
  PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
'DD/MM/'));

- First, Multi level partitioning is a common thing in big data systems.
- Second, the syntax of "SUBPARTITIONS" is not only more complex, but also
completely different from big data systems such as hive. Big data systems
need to specify less partition information than traditional ones, so it is
more natural to write all partitions in one bracket.

## Other Interface changes

It can be imagined that this change will involve many Catalog / Table
related interfaces, and it is necessary to replace the previous
`List partitionKeys` with `partitioning strategies`.

What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
[3]http://iceberg.apache.org/partitioning/
[4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes

Best,
Jingsong


??????OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread ??????
Still failed after every 12 tasks.
And the exception stack of failed tasks is different.


such as the recent failed tasks's exception info:
Caused by: java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at 
java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at 
java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at 
java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at 
java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native 
Method)
at 
java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at 
org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90)
at 
ru.yandex.clickhouse.ClickHouseConnectionImpl.initTimeZone(ClickHouseConnectionImpl.java:94)
at 
ru.yandex.clickhouse.ClickHouseConnectionImpl.

??????OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread ??????
Additional info:


The exception info in Flink Manager Page:


Caused by: java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at 
java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at 
java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at 
java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at 
java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native 
Method)
at 
java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at 
org.apache.http.impl.client.CloseableHttpClient.determineTarget(CloseableHttpClient.java:93)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:614)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:117)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:100)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:95)
at 
ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:90)
at 
ru.yandex.clickhouse.ClickHouseConnectionImpl.initTimeZone(ClickHouseConnectionImpl.java:94)
at 
ru.yandex.clickhouse.ClickHouseConnectionImpl.

Re: OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread Chesnay Schepler

Which Flink version are you using?

On 24/08/2020 10:20, ?? wrote:

Hi,

I catch?0?2 "OutOfMemoryError: Metaspace" on Batch Task When Write into 
Clickhouse.

Attached?0?2 *.java file?0?2 is my task code.

And I find that, after running 12 tasks, the 13th task will be failed. 
And the exception always is "OutOfMemoryError: Metaspace". see 
"task-failed.png"


I conf -XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/path/to/hprofFile
and dump the hprof file.
I analyse this hprof file. And find this error occurs may not caused 
by my user-code.
So I came here ask for your help. To confirm whether the memory leak 
should be caused by Flink.

Attached file "java_pid29294.hprof" is the dump file.
Thanks.

*??QQ??*

java_pid29294.hprof 
(81.44M, 
2020??09??23?? 16:05 )
 
??http://mail.qq.com/cgi-bin/ftnExs_download?t=exs_ftn_download=71666339b499dbc2277cdd0e1364011709520651525e14545b010349075a0a504e01570607155c020200015550080d55570e3577335258100266450d570a00545a0d1b0c434a56006304=9fc95d38





Re: OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread Till Rohrmann
Hi,

thanks for reaching out to the community. Could you share a bit more
details about the cluster setup (session cluster, per-job cluster
deployment), Flink version and maybe also share the logs with us? Sharing
your user code and the libraries you are using can also be helpful in
figuring out what is going wrong.

Cheers,
Till

On Mon, Aug 24, 2020 at 10:22 AM 耿延杰  wrote:

> Hi,
>
> I catch  "OutOfMemoryError: Metaspace" on Batch Task When Write into
> Clickhouse.
> Attached  *.java file  is my task code.
>
> And I find that, after running 12 tasks, the 13th task will be failed. And
> the exception always is "OutOfMemoryError: Metaspace". see "task-failed.png"
>
> I conf -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/to/hprofFile
> and dump the hprof file.
> I analyse this hprof file. And find this error occurs may not caused by my
> user-code.
> So I came here ask for your help. To confirm whether the memory leak
> should be caused by Flink.
>
> Attached file "java_pid29294.hprof" is the dump file.
>
> Thanks.
>
>
>
> --
> *从QQ邮箱发来的超大附件*
>
> 
> java_pid29294.hprof
> 
> (81.44M, 2020年09月23日 16:05 到期)
> 进入下载页面
> 
> :
> http://mail.qq.com/cgi-bin/ftnExs_download?t=exs_ftn_download=71666339b499dbc2277cdd0e1364011709520651525e14545b010349075a0a504e01570607155c020200015550080d55570e3577335258100266450d570a00545a0d1b0c434a56006304=9fc95d38
>


OutOfMemoryError: Metaspace on Batch Task When Write into Clickhouse

2020-08-24 Thread ??????
Hi,


I catch "OutOfMemoryError: Metaspace" on Batch Task When Write into 
Clickhouse.
Attached *.java file is my task code.

And I find that, after running 12 tasks, the 13th task will be failed. And the 
exception always is "OutOfMemoryError: Metaspace". see "task-failed.png"


I conf-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/to/hprofFile
and dump the hprof file.
I analyse this hprof file. And find this error occurs may not caused by my 
user-code.
So I came here ask for your help. To confirm whether the memory leak should be 
caused by Flink.


Attached file  "java_pid29294.hprof" is the dump file.


Thanks.







??QQ??

java_pid29294.hprof (81.44M, 2020??09??23?? 16:05 
)??http://mail.qq.com/cgi-bin/ftnExs_download?t=exs_ftn_download=71666339b499dbc2277cdd0e1364011709520651525e14545b010349075a0a504e01570607155c020200015550080d55570e3577335258100266450d570a00545a0d1b0c434a56006304=9fc95d38

Next Stateful Functions Release

2020-08-24 Thread Igal Shilman
Hi Flink devs,

We have a few upcoming / implemented features for Stateful Functions on the
radar, and would like to give a heads up on what to expect for the next
release:

1. Upgrade support for Flink 1.11.x. [FLINK-18812]
2. Fine grained control on remote state configuration, such as state TTL.
[FLINK-17954]
3. New state construct for dynamic state registration [FLINK-18316]
4. Add a DataStream API to StateFun [FLINK-19001]
5. Support async handlers for the Python SDK [FLINK-18518]
6. Add more metrics around async operations and backpressure [FLINK-19020]
7. Out-of-box support for common storage systems in flink-statefun Docker
image [FLINK-19019]

With these we think the project will be in a good spot for the next release.
What do you think about aiming at 10.9.2020 for a feature freeze for
StateFun 2.2?

Kind regards,
Igal.


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
>
> Hi, Klou
>
> Thanks for your proposal. It's a very good idea.
> Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if 
> all sources are bounded. For example some applications would use the 
> `CheckpointListener`, which is not available in the BATCH mode in current 
> implementation.
> So maybe we need more checks in the AUTOMATIC execution mode.
>
> Best,
> Guowei
>
>
> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> Thanks for the comments!
>>
>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>> look it is not used currently by any configuration option. I will
>> update the FLIP accordingly.
>>
>> @David: Given that having the option to allow timers to fire at the
>> end of the job is already in the FLIP, I will leave it as is and I
>> will update the default policy to be "ignore processing time timers
>> set by the user". This will allow existing dataStream programs to run
>> on bounded inputs. This update will affect point 2 in the "Processing
>> Time Support in Batch" section.
>>
>> If these changes cover your proposals, then I would like to start a
>> voting thread tomorrow evening if this is ok with you.
>>
>> Please let me know until then.
>>
>> Kostas
>>
>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:
>> >
>> > Being able to optionally fire registered processing time timers at the end 
>> > of a job would be interesting, and would help in (at least some of) the 
>> > cases I have in mind. I don't have a better idea.
>> >
>> > David
>> >
>> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:
>> >>
>> >> Hi Kurt and David,
>> >>
>> >> Thanks a lot for the insightful feedback!
>> >>
>> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> >> agree with you that it requires a lot more work and careful thinking
>> >> on the semantics. This FLIP was written under the assumption that if
>> >> the user wants to have checkpoints on bounded input, he/she will have
>> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> >> can be handled as a separate topic in the future.
>> >>
>> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> >> should be set to STREAMING. That is why the AUTOMATIC option sets
>> >> scheduling to BATCH only if all the sources are bounded. I am not sure
>> >> what are the plans there at the scheduling level, as one could imagine
>> >> in the future that in mixed workloads, we schedule first all the
>> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> >> subgraph per application, which is going to be scheduled after all
>> >> Bounded ones have finished. Essentially the bounded subgraphs will be
>> >> used to bootstrap the unbounded one. But, I am not aware of any plans
>> >> towards that direction.
>> >>
>> >>
>> >> @David: The processing time timer handling is a topic that has also
>> >> been discussed in the community in the past, and I do not remember any
>> >> final conclusion unfortunately.
>> >>
>> >> In the current context and for bounded input, we chose to favor
>> >> reproducibility of the result, as this is expected in batch processing
>> >> where the whole input is available in advance. This is why this
>> >> proposal suggests to not allow processing time timers. But I
>> >> understand your argument that the user may want to be able to run the
>> >> same pipeline on batch and streaming this is why we added the two
>> >> options under future work, namely (from the FLIP):
>> >>
>> >> ```
>> >> Future Work: In the future we may consider adding as options the 
>> >> capability of:
>> >> * firing all the registered processing time timers at the end of a job
>> >> (at close()) or,
>> >> * ignoring all the registered processing time timers at the end of a job.
>> >> ```
>> >>
>> >> Conceptually, we are essentially saying that we assume that batch
>> >> execution is assumed to be instantaneous and refers to a single
>> >> "point" in time and any processing-time timers for the future may fire
>> >> at the end of execution or be ignored (but not throw an exception). I
>> >> could also see ignoring the timers in batch as the default, if this
>> >> makes more sense.
>> >>
>> >> By the way, do you have any usecases in mind that will help us better
>> >> shape our processing time timer handling?
>> >>
>> >> Kostas
>> >>
>> >> On 

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Guowei Ma
Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".  In the
AUTOMATIC execution mode maybe we could not pick BATCH execution mode even
if all sources are bounded. For example some applications would use the
`CheckpointListener`, which is not available in the BATCH mode in current
implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:

> Hi all,
>
> Thanks for the comments!
>
> @Dawid: "execution.mode" can be a nice alternative and from a quick
> look it is not used currently by any configuration option. I will
> update the FLIP accordingly.
>
> @David: Given that having the option to allow timers to fire at the
> end of the job is already in the FLIP, I will leave it as is and I
> will update the default policy to be "ignore processing time timers
> set by the user". This will allow existing dataStream programs to run
> on bounded inputs. This update will affect point 2 in the "Processing
> Time Support in Batch" section.
>
> If these changes cover your proposals, then I would like to start a
> voting thread tomorrow evening if this is ok with you.
>
> Please let me know until then.
>
> Kostas
>
> On Tue, Aug 18, 2020 at 3:54 PM David Anderson 
> wrote:
> >
> > Being able to optionally fire registered processing time timers at the
> end of a job would be interesting, and would help in (at least some of) the
> cases I have in mind. I don't have a better idea.
> >
> > David
> >
> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas 
> wrote:
> >>
> >> Hi Kurt and David,
> >>
> >> Thanks a lot for the insightful feedback!
> >>
> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> agree with you that it requires a lot more work and careful thinking
> >> on the semantics. This FLIP was written under the assumption that if
> >> the user wants to have checkpoints on bounded input, he/she will have
> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> can be handled as a separate topic in the future.
> >>
> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> what are the plans there at the scheduling level, as one could imagine
> >> in the future that in mixed workloads, we schedule first all the
> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> subgraph per application, which is going to be scheduled after all
> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> towards that direction.
> >>
> >>
> >> @David: The processing time timer handling is a topic that has also
> >> been discussed in the community in the past, and I do not remember any
> >> final conclusion unfortunately.
> >>
> >> In the current context and for bounded input, we chose to favor
> >> reproducibility of the result, as this is expected in batch processing
> >> where the whole input is available in advance. This is why this
> >> proposal suggests to not allow processing time timers. But I
> >> understand your argument that the user may want to be able to run the
> >> same pipeline on batch and streaming this is why we added the two
> >> options under future work, namely (from the FLIP):
> >>
> >> ```
> >> Future Work: In the future we may consider adding as options the
> capability of:
> >> * firing all the registered processing time timers at the end of a job
> >> (at close()) or,
> >> * ignoring all the registered processing time timers at the end of a
> job.
> >> ```
> >>
> >> Conceptually, we are essentially saying that we assume that batch
> >> execution is assumed to be instantaneous and refers to a single
> >> "point" in time and any processing-time timers for the future may fire
> >> at the end of execution or be ignored (but not throw an exception). I
> >> could also see ignoring the timers in batch as the default, if this
> >> makes more sense.
> >>
> >> By the way, do you have any usecases in mind that will help us better
> >> shape our processing time timer handling?
> >>
> >> Kostas
> >>
> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson 
> wrote:
> >> >
> >> > Kostas,
> >> >
> >> > I'm pleased to see some concrete details in this FLIP.
> >> >
> >> > I wonder if the current proposal goes far enough in the direction of
> recognizing the need some users may have for "batch" and "bounded
> streaming" to be treated differently. If I've understood it correctly, the
> section on scheduling allows me to choose STREAMING scheduling even if I
> have bounded sources. I like that approach, because it recognizes that even
> though I have bounded inputs, I don't necessarily want batch processing
> semantics. I think it makes sense to 

[DISCUSS] FLIP-137: Support Pandas UDAF in PyFlink

2020-08-24 Thread Xingbo Huang
Hi everyone,

I would like to start a discussion thread on "Support Pandas UDAF in
PyFlink"

Pandas UDF has been supported in FLINK 1.11 (FLIP-97[1]). It solves the
high serialization/deserialization overhead in Python UDF and makes it
convenient to leverage the popular Python libraries such as Pandas, Numpy,
etc. Since Pandas UDF has so many advantages, we want to support Pandas
UDAF to extend usage of Pandas UDF.

Dian Fu and I have discussed offline and have drafted the FLIP-137[2]. It
includes the following items:
  - Support Pandas UDAF in Batch Group Aggregation
  - Support Pandas UDAF in Batch Group Window Aggregation
  - Support Pandas UDAF in Batch Over Window Aggregation
  - Support Pandas UDAF in Stream Group Window Aggregation
  - Support Pandas UDAF in Stream Bounded Over Window Aggregation


Looking forward to your feedback!

Best,
Xingbo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-137%3A+Support+Pandas+UDAF+in+PyFlink