[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-14 Thread chunpinghe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687577#comment-16687577
 ] 

chunpinghe commented on FLINK-10884:


what's your solution?

yarn will check the physical memory used by container by default, you can 
disable it by set {color:#6a8759}yarn.nodemanager.pmem-check-enabled 
{color:#33}to false. in your example, if your container use too much 
offheap memory(directory memory , or jni malloc) lead to total memory exceeds 
3g then the container will be killed anyhow.{color}
{color}

{color:#6a8759}{color:#33}so, if your container was always killed by 
nodemanager you shoud check if the total memory you provided for it is not 
sufficient or your code has memory leak (mainly native memory 
leak){color}{color}

 

 

> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.6.2
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Priority: Major
>  Labels: yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687555#comment-16687555
 ] 

ASF GitHub Bot commented on FLINK-10891:


yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 
2.0.1
URL: https://github.com/apache/flink/pull/7101#issuecomment-438935809
 
 
   @twalthr This PR also blocked by FLINK-10624 which contains Kafka server 
version (2.0.0).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade Kafka client version to 2.0.1
> -
>
> Key: FLINK-10891
> URL: https://issues.apache.org/jira/browse/FLINK-10891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Since the modern kafka connector only keeps track of the latest version of 
> the kafka client. With the release of Kafka 2.0.1, we should upgrade the 
> version of the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1

2018-11-14 Thread GitBox
yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 
2.0.1
URL: https://github.com/apache/flink/pull/7101#issuecomment-438935809
 
 
   @twalthr This PR also blocked by FLINK-10624 which contains Kafka server 
version (2.0.0).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687554#comment-16687554
 ] 

ASF GitHub Bot commented on FLINK-10872:


yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to 
test KafkaTableSink for kafka connector 0.11
URL: https://github.com/apache/flink/pull/7100#issuecomment-43893
 
 
   @twalthr I cherry-picked your refactor about FLINK-10624 and squashed them 
so that I can process this issue in parallel. But I split it into a single 
commit. But in essence, it is still blocked by FLINK-10624.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
> 
>
> Key: FLINK-10872
> URL: https://issues.apache.org/jira/browse/FLINK-10872
> Project: Flink
>  Issue Type: Test
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10872:
---
Labels: pull-request-available  (was: )

> Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
> 
>
> Key: FLINK-10872
> URL: https://issues.apache.org/jira/browse/FLINK-10872
> Project: Flink
>  Issue Type: Test
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-14 Thread GitBox
yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to 
test KafkaTableSink for kafka connector 0.11
URL: https://github.com/apache/flink/pull/7100#issuecomment-43893
 
 
   @twalthr I cherry-picked your refactor about FLINK-10624 and squashed them 
so that I can process this issue in parallel. But I split it into a single 
commit. But in essence, it is still blocked by FLINK-10624.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10891:
---
Labels: pull-request-available  (was: )

> Upgrade Kafka client version to 2.0.1
> -
>
> Key: FLINK-10891
> URL: https://issues.apache.org/jira/browse/FLINK-10891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Since the modern kafka connector only keeps track of the latest version of 
> the kafka client. With the release of Kafka 2.0.1, we should upgrade the 
> version of the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1

2018-11-14 Thread GitBox
yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client 
version to 2.0.1
URL: https://github.com/apache/flink/pull/7101
 
 
   ## What is the purpose of the change
   
   *This pull request upgrades Kafka client version to 2.0.1*
   
   
   ## Brief change log
   
 - *Upgrade Kafka client version to 2.0.1*
   
   ## Verifying this change
   
   This change is already covered by existing tests*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687538#comment-16687538
 ] 

ASF GitHub Bot commented on FLINK-10891:


yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client 
version to 2.0.1
URL: https://github.com/apache/flink/pull/7101
 
 
   ## What is the purpose of the change
   
   *This pull request upgrades Kafka client version to 2.0.1*
   
   
   ## Brief change log
   
 - *Upgrade Kafka client version to 2.0.1*
   
   ## Verifying this change
   
   This change is already covered by existing tests*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade Kafka client version to 2.0.1
> -
>
> Key: FLINK-10891
> URL: https://issues.apache.org/jira/browse/FLINK-10891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Since the modern kafka connector only keeps track of the latest version of 
> the kafka client. With the release of Kafka 2.0.1, we should upgrade the 
> version of the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-14 Thread vinoyang (JIRA)
vinoyang created FLINK-10891:


 Summary: Upgrade Kafka client version to 2.0.1
 Key: FLINK-10891
 URL: https://issues.apache.org/jira/browse/FLINK-10891
 Project: Flink
  Issue Type: Sub-task
  Components: Kafka Connector
Reporter: vinoyang
Assignee: vinoyang


Since the modern kafka connector only keeps track of the latest version of the 
kafka client. With the release of Kafka 2.0.1, we should upgrade the version of 
the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10890) CLONE - Add DataStream HBase Sink

2018-11-14 Thread jocean....@gamil.com (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jocean@gamil.com closed FLINK-10890.

Resolution: Won't Do

> CLONE - Add DataStream HBase Sink
> -
>
> Key: FLINK-10890
> URL: https://issues.apache.org/jira/browse/FLINK-10890
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: jocean@gamil.com
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-14 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687524#comment-16687524
 ] 

Dian Fu commented on FLINK-8159:


Thanks a lot for the reply and advice. I will submit a PR later today.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10890) CLONE - Add DataStream HBase Sink

2018-11-14 Thread jocean....@gamil.com (JIRA)
jocean@gamil.com created FLINK-10890:


 Summary: CLONE - Add DataStream HBase Sink
 Key: FLINK-10890
 URL: https://issues.apache.org/jira/browse/FLINK-10890
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: jocean@gamil.com
Assignee: Shimin Yang


Design documentation: 
[https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9718) Add enviroment variable in start-scala-shell.sh & flink to enable remote debug

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687507#comment-16687507
 ] 

ASF GitHub Bot commented on FLINK-9718:
---

zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment 
variable in start-scala-shell.sh and flink to enable remote debug
URL: https://github.com/apache/flink/pull/6245#issuecomment-438922942
 
 
   ping @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add enviroment variable in start-scala-shell.sh & flink to enable remote debug
> --
>
> Key: FLINK-9718
> URL: https://issues.apache.org/jira/browse/FLINK-9718
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment variable in start-scala-shell.sh and flink to enable remote debug

2018-11-14 Thread GitBox
zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment 
variable in start-scala-shell.sh and flink to enable remote debug
URL: https://github.com/apache/flink/pull/6245#issuecomment-438922942
 
 
   ping @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687500#comment-16687500
 ] 

ASF GitHub Bot commented on FLINK-10887:


tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233714795
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
 
 Review comment:
   I wonder if it should be qualified as `SourceWatermark` vs. just 
`Watermark`? Perhaps there are use cases for exchanging watermarks across 
subtasks that don't necessarily belong to a source. One such example could be 
operators that perform asynchronous operations. Related, do we want to allow 
for an identifier for the watermark so that within an application multiple 
independent groupings could be formed? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-11-14 Thread GitBox
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233715375
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
 
 Review comment:
   This may be a bit far fetched, but can it be generalized further to 
something like a named counter/metric? Currently there isn't anything watermark 
specific here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687502#comment-16687502
 ] 

ASF GitHub Bot commented on FLINK-10887:


tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233715375
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
 
 Review comment:
   This may be a bit far fetched, but can it be generalized further to 
something like a named counter/metric? Currently there isn't anything watermark 
specific here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-11-14 Thread GitBox
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233714795
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
 
 Review comment:
   I wonder if it should be qualified as `SourceWatermark` vs. just 
`Watermark`? Perhaps there are use cases for exchanging watermarks across 
subtasks that don't necessarily belong to a source. One such example could be 
operators that perform asynchronous operations. Related, do we want to allow 
for an identifier for the watermark so that within an application multiple 
independent groupings could be formed? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-11-14 Thread GitBox
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233714283
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private long timestamp;
 
 Review comment:
   What does the timestamp represent? Is it when the watermark last changed or 
when it was last communicated by the subtask (even if it did not change, for 
example because the subtask is just reading a lot of data under the same 
watermark). We will need a way to detect that a source subtask is idle so we 
can avoid waiting for it (similar to how we has to identify idle within a 
subtask).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687496#comment-16687496
 ] 

ASF GitHub Bot commented on FLINK-10887:


tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r233714283
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.watermark;
+
+import java.io.Serializable;
+
+/**
+ * This represents the watermark for a single source partition.
+ */
+public class SourceWatermark implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private long timestamp;
 
 Review comment:
   What does the timestamp represent? Is it when the watermark last changed or 
when it was last communicated by the subtask (even if it did not change, for 
example because the subtask is just reading a lot of data under the same 
watermark). We will need a way to detect that a source subtask is idle so we 
can avoid waiting for it (similar to how we has to identify idle within a 
subtask).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687465#comment-16687465
 ] 

ASF GitHub Bot commented on FLINK-10774:


stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438914628
 
 
   Got the evidence that it is thread safety problem
   
   ```
   2018-11-15 04:24:17,795 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1627)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
at java.lang.Thread.run(Thread.java:748)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> connection leak when partition discovery is disabled and open throws exception
> --
>
> Key: FLINK-10774
> URL: https://issues.apache.org/jira/browse/FLINK-10774
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2, 1.5.5, 1.6.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2018-11-14 Thread GitBox
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438914628
 
 
   Got the evidence that it is thread safety problem
   
   ```
   2018-11-15 04:24:17,795 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1627)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
at java.lang.Thread.run(Thread.java:748)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print

2018-11-14 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10889:


Assignee: vinoyang

> Semantic inconsistency between DataSet#print and DataStream#print
> -
>
> Key: FLINK-10889
> URL: https://issues.apache.org/jira/browse/FLINK-10889
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: Jeff Zhang
>Assignee: vinoyang
>Priority: Major
>
> DataSet#print will print the result on client side, while DataStream#print 
> will print the result on TM. This inconsistency will confuse users. IMHO, we 
> should make the behavior consistency between DataSet and DataStream, I prefer 
> to print the result on client side.  Regarding DataStream#print, we can use 
> DataStreamUtils#collect to print it on client side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print

2018-11-14 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-10889:
--

 Summary: Semantic inconsistency between DataSet#print and 
DataStream#print
 Key: FLINK-10889
 URL: https://issues.apache.org/jira/browse/FLINK-10889
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API, DataStream API
Reporter: Jeff Zhang


DataSet#print will print the result on client side, while DataStream#print will 
print the result on TM. This inconsistency will confuse users. IMHO, we should 
make the behavior consistency between DataSet and DataStream, I prefer to print 
the result on client side.  Regarding DataStream#print, we can use 
DataStreamUtils#collect to print it on client side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #7100: [Flink 10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11

2018-11-14 Thread GitBox
yanghua opened a new pull request #7100: [Flink 10872] Extend SQL client 
end-to-end to test KafkaTableSink for kafka connector 0.11
URL: https://github.com/apache/flink/pull/7100
 
 
   ## What is the purpose of the change
   
   *This pull request extends SQL client end-to-end to test KafkaTableSink for 
kafka connector 0.11*
   
   
   ## Brief change log
   
 - *Extend SQL client end-to-end to test KafkaTableSink for kafka connector 
0.11*
   
   ## Verifying this change
   
   
   This change is already covered by existing tests.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10873) Remove tableEnv in DataSetConversions#toTable and DataStreamConversions#toTable

2018-11-14 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10873:
---

Assignee: Hequn Cheng

> Remove tableEnv in DataSetConversions#toTable and 
> DataStreamConversions#toTable
> ---
>
> Key: FLINK-10873
> URL: https://issues.apache.org/jira/browse/FLINK-10873
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Assignee: Hequn Cheng
>Priority: Major
>
> What I would like to achieve is to change the following code
> {code}
> val table = data.flatMap(line=>line.split("\\s"))
>   .map(w => (w, 1))
>   .toTable(tEnv, 'word, 'count)
> {code}
> to this
> {code}
> val table = data.flatMap(line=>line.split("\\s"))
>   .map(w => (w, 1))
>   .toTable('word, 'count)
> {code}
> The only change is that tableEnv is removed in method toTable.  I think the 
> second piece of code is more readable. We can create TableEnvironment based 
> on the ExecutionEnvironment of DataSet/DataStream rather than asking user to 
> pass it explicitly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687333#comment-16687333
 ] 

Jamie Grier commented on FLINK-10886:
-

ML discussion: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html]

 

> Event time synchronization across sources
> -
>
> Key: FLINK-10886
> URL: https://issues.apache.org/jira/browse/FLINK-10886
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> When reading from a source with many parallel partitions, especially when 
> reading lots of historical data (or recovering from downtime and there is a 
> backlog to read), it's quite common for there to develop an event-time skew 
> across those partitions.
>  
> When doing event-time windowing -- or in fact any event-time driven 
> processing -- the event time skew across partitions results directly in 
> increased buffering in Flink and of course the corresponding state/checkpoint 
> size growth.
>  
> As the event-time skew and state size grows larger this can have a major 
> effect on application performance and in some cases result in a "death 
> spiral" where the application performance get's worse and worse as the state 
> size grows and grows.
>  
> So, one solution to this problem, outside of core changes in Flink itself, 
> seems to be to try to coordinate sources across partitions so that they make 
> progress through event time at roughly the same rate.  In fact if there is 
> large skew the idea would be to slow or even stop reading from some 
> partitions with newer data while first reading the partitions with older 
> data.  Anyway, to do this we need to share state somehow amongst sub-tasks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687311#comment-16687311
 ] 

ASF GitHub Bot commented on FLINK-10887:


jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source 
watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099
 
 
   ## What is the purpose of the change
   
   This commit adds a JobMaster RPC endpoint that is used to share information 
across source subtasks regarding event time progress.
   
   This will be used implement event time source synchronization across sources.
   
   ## Brief change log
 - New RPC endpoint on JobMaster to track event time progress across source 
sub-tasks
 - Updated JobMaster Tests
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added testCase to ensure watermark stats are computed correctly to 
JobMasterTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10887:
---
Labels: pull-request-available  (was: )

> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2018-11-14 Thread GitBox
jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source 
watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099
 
 
   ## What is the purpose of the change
   
   This commit adds a JobMaster RPC endpoint that is used to share information 
across source subtasks regarding event time progress.
   
   This will be used implement event time source synchronization across sources.
   
   ## Brief change log
 - New RPC endpoint on JobMaster to track event time progress across source 
sub-tasks
 - Updated JobMaster Tests
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added testCase to ensure watermark stats are computed correctly to 
JobMasterTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687278#comment-16687278
 ] 

ASF GitHub Bot commented on FLINK-10774:


stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection 
leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127
 
 
   we saw some exception regarding redundant calls to close method. will need 
to fix it.
   ```
   2018-11-13 18:26:54,928 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.lang.IllegalStateException: This consumer has already been closed.
at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
at java.lang.Thread.run(Thread.java:748)
   ```
   
   Can `run` method and `cancel` method executed in different threads? if so, 
we need to add `synchronized` to this method in `Kafka09PartitionDiscoverer`. 
   ```
@Override
protected void closeConnections() throws Exception {
if (this.kafkaConsumer != null) {
this.kafkaConsumer.close();
   
// de-reference the consumer to avoid closing multiple 
times
this.kafkaConsumer = null;
}
}
   ```
   
   I looked at the `KafkaConsumer` code again. its close method calls 
`acquire`. I thought it is acquire lock, which is not the case.
   ```
   private void acquire() {
   this.ensureNotClosed();
   long threadId = Thread.currentThread().getId();
   if (threadId != this.currentThread.get() && 
!this.currentThread.compareAndSet(-1L, threadId)) {
   throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access");
   } else {
   this.refcount.incrementAndGet();
   }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> connection leak when partition discovery is disabled and open throws exception
> --
>
> Key: FLINK-10774
> URL: https://issues.apache.org/jira/browse/FLINK-10774
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2, 1.5.5, 1.6.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2018-11-14 Thread GitBox
stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection 
leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127
 
 
   we saw some exception regarding redundant calls to close method. will need 
to fix it.
   ```
   2018-11-13 18:26:54,928 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.lang.IllegalStateException: This consumer has already been closed.
at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
at java.lang.Thread.run(Thread.java:748)
   ```
   
   Can `run` method and `cancel` method executed in different threads? if so, 
we need to add `synchronized` to this method in `Kafka09PartitionDiscoverer`. 
   ```
@Override
protected void closeConnections() throws Exception {
if (this.kafkaConsumer != null) {
this.kafkaConsumer.close();
   
// de-reference the consumer to avoid closing multiple 
times
this.kafkaConsumer = null;
}
}
   ```
   
   I looked at the `KafkaConsumer` code again. its close method calls 
`acquire`. I thought it is acquire lock, which is not the case.
   ```
   private void acquire() {
   this.ensureNotClosed();
   long threadId = Thread.currentThread().getId();
   if (threadId != this.currentThread.get() && 
!this.currentThread.compareAndSet(-1L, threadId)) {
   throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access");
   } else {
   this.refcount.incrementAndGet();
   }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10888:
---

 Summary: Expose new global watermark RPC to sources
 Key: FLINK-10888
 URL: https://issues.apache.org/jira/browse/FLINK-10888
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


Expose new JobMaster RPC for watermark tracking to Source implementations so it 
can be used to align reads across sources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-10887:

Summary: Add source watermark tracking to the JobMaster  (was: Add source 
watermarking tracking to the JobMaster)

> Add source watermark tracking to the JobMaster
> --
>
> Key: FLINK-10887
> URL: https://issues.apache.org/jira/browse/FLINK-10887
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Jamie Grier
>Assignee: Jamie Grier
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We need to add a new RPC to the JobMaster such that the current watermark for 
> every source sub-task can be reported and the current global minimum/maximum 
> watermark can be retrieved so that each source can adjust their partition 
> read rates in an attempt to keep sources roughly aligned in event time.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10887:
---

 Summary: Add source watermarking tracking to the JobMaster
 Key: FLINK-10887
 URL: https://issues.apache.org/jira/browse/FLINK-10887
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: Jamie Grier
Assignee: Jamie Grier


We need to add a new RPC to the JobMaster such that the current watermark for 
every source sub-task can be reported and the current global minimum/maximum 
watermark can be retrieved so that each source can adjust their partition read 
rates in an attempt to keep sources roughly aligned in event time.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10886:
---

 Summary: Event time synchronization across sources
 Key: FLINK-10886
 URL: https://issues.apache.org/jira/browse/FLINK-10886
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


When reading from a source with many parallel partitions, especially when 
reading lots of historical data (or recovering from downtime and there is a 
backlog to read), it's quite common for there to develop an event-time skew 
across those partitions.
 
When doing event-time windowing -- or in fact any event-time driven processing 
-- the event time skew across partitions results directly in increased 
buffering in Flink and of course the corresponding state/checkpoint size growth.
 
As the event-time skew and state size grows larger this can have a major effect 
on application performance and in some cases result in a "death spiral" where 
the application performance get's worse and worse as the state size grows and 
grows.
 
So, one solution to this problem, outside of core changes in Flink itself, 
seems to be to try to coordinate sources across partitions so that they make 
progress through event time at roughly the same rate.  In fact if there is 
large skew the idea would be to slow or even stop reading from some partitions 
with newer data while first reading the partitions with older data.  Anyway, to 
do this we need to share state somehow amongst sub-tasks.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686940#comment-16686940
 ] 

ASF GitHub Bot commented on FLINK-10869:


StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438763465
 
 
   Thanks for the reviews.
   
   The migration of credentials is not yet complete. End-to-end tests are 
missing and the Yarn ITCase as well.
   The old credentials still work, will be phased out once we migrated 
everything.
   
   +1 to adding a test utility to capture access to the env variables and the 
path construction.
   Will add a commit here that refactors that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-14 Thread GitBox
StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438763465
 
 
   Thanks for the reviews.
   
   The migration of credentials is not yet complete. End-to-end tests are 
missing and the Yarn ITCase as well.
   The old credentials still work, will be phased out once we migrated 
everything.
   
   +1 to adding a test utility to capture access to the env variables and the 
path construction.
   Will add a commit here that refactors that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686891#comment-16686891
 ] 

ASF GitHub Bot commented on FLINK-10869:


zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438748425
 
 
   btw, for verifying this change we can push it to a separate branch in the 
apache repo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-14 Thread GitBox
zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438748425
 
 
   btw, for verifying this change we can push it to a separate branch in the 
apache repo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686887#comment-16686887
 ] 

ASF GitHub Bot commented on FLINK-10869:


zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438746732
 
 
   @dawidwys 
   * all S3 tests already check whether credentials are provided, and if not 
are skipped via assumptions
   * this does not change anything in terms of behavior; these tests were 
always only run in the Apache Flink Travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-14 Thread GitBox
zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438746732
 
 
   @dawidwys 
   * all S3 tests already check whether credentials are provided, and if not 
are skipped via assumptions
   * this does not change anything in terms of behavior; these tests were 
always only run in the Apache Flink Travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10531) State TTL RocksDb backend end-to-end test failed on Travis

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686867#comment-16686867
 ] 

ASF GitHub Bot commented on FLINK-10531:


kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end 
test.
URL: https://github.com/apache/flink/pull/7036#issuecomment-438735965
 
 
   @azagrebin Please have another look and let me know what you think!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State TTL RocksDb backend end-to-end test failed on Travis
> --
>
> Key: FLINK-10531
> URL: https://issues.apache.org/jira/browse/FLINK-10531
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.1
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
>
> The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on 
> Travis.
> https://travis-ci.org/apache/flink/jobs/438226190
> https://api.travis-ci.org/v3/job/438226190/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test.

2018-11-14 Thread GitBox
kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end 
test.
URL: https://github.com/apache/flink/pull/7036#issuecomment-438735965
 
 
   @azagrebin Please have another look and let me know what you think!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10851) sqlUpdate support complex insert grammar

2018-11-14 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686774#comment-16686774
 ] 

Dawid Wysakowicz commented on FLINK-10851:
--

[~frank wang] have you resolved your issue? If so can we close this ticket and 
corresponding PR?

> sqlUpdate support complex insert grammar
> 
>
> Key: FLINK-10851
> URL: https://issues.apache.org/jira/browse/FLINK-10851
> Project: Flink
>  Issue Type: Bug
>Reporter: frank wang
>Priority: Major
>  Labels: pull-request-available
>
> my code is
> {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, 
> filedName2 from kafka.sdkafka.order_4");}}
> but flink give me error info, said kafka "No table was registered under the 
> name kafka"
> i modify the code ,that is ok now
> TableEnvironment.scala
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
>   val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
> getTypeFactory)
>   // parse the sql query
>   val parsed = planner.parse(stmt)
>   parsed match {
> case insert: SqlInsert =>
>   // validate the SQL query
>   val query = insert.getSource
>   val validatedQuery = planner.validate(query)
>   // get query result as Table
>   val queryResult = new Table(this, 
> LogicalRelNode(planner.rel(validatedQuery).rel))
>   // get name of sink table
>   val targetTableName = 
> insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
>   // insert query result into sink table
>   insertInto(queryResult, targetTableName, config)
> case _ =>
>   throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
> type INSERT.")
>   }
> }
> {code}
> should modify to this
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
>   val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
> getTypeFactory)
>   // parse the sql query
>   val parsed = planner.parse(stmt)
>   parsed match {
> case insert: SqlInsert =>
>   // validate the SQL query
>   val query = insert.getSource
>   val validatedQuery = planner.validate(query)
>   // get query result as Table
>   val queryResult = new Table(this, 
> LogicalRelNode(planner.rel(validatedQuery).rel))
>   // get name of sink table
>   //val targetTableName = 
> insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
>   val targetTableName = insert.getTargetTable.toString
>   // insert query result into sink table
>   insertInto(queryResult, targetTableName, config)
> case _ =>
>   throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
> type INSERT.")
>   }
> }
> {code}
>  
> i hope this can be acceptted, thx



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686764#comment-16686764
 ] 

ASF GitHub Bot commented on FLINK-10869:


dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438713201
 
 
   Does it mean that those tests will always fail on user's travis(unless they 
setup AWS credentials)?
   If so I think we should at least skip those tests if credentials are not 
provided?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-14 Thread GitBox
dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to 
reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098#issuecomment-438713201
 
 
   Does it mean that those tests will always fail on user's travis(unless they 
setup AWS credentials)?
   If so I think we should at least skip those tests if credentials are not 
provided?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-14 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686705#comment-16686705
 ] 

Dawid Wysakowicz commented on FLINK-8159:
-

Ad. 1 Agree with that approach with adding additional abstract class. But my 
original question was about using a new {{PatternSelectFunction}}, but I don't 
think we should support dynamically changing it, that should be possible only 
for {{IterativeCondition}}
Ad. 2 Agreed
Ad. 3 I know we could, but I don't think we should expose everything. actually 
I would limit its capabilities to a minimum, e.g. prohibit state creation, 
accumulators etc. This is of great importance especially for 
{{IterativeCondition}}

I think as we have somewhat similar view on that issue, I think we could start 
with the implementation if you are still interested ;). Also last but not least 
sorry for the late response.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686655#comment-16686655
 ] 

Flavio Pompermaier commented on FLINK-10864:


If you look at the JarListInfo, the class returned by the /jars REST API, you 
can see that it was meant to support multiple main classes in a single jar (in 
contains a List). If I had to write this API I would also add a 
description of the job parameters/options.

I understand that removing complexity from the code is always good but this 
looks like a step back to me. 
Why not supporting 2 different flavors of *Main-Class*? One that does not 
implement any interface (and thus does not provide any info like it is now) and 
one that implement a "brand new" interface FlinkJob which has at least 2 
methods: getDescrition() and List getJobParameters()?

In this way you can get rid of the legacy Program and ProgramDescription 
interfaces and read only a the Main-Class attribute from the Manifest..

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10885) Avro Confluent Schema Registry E2E test failed on Travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10885:


 Summary: Avro Confluent Schema Registry E2E test failed on Travis
 Key: FLINK-10885
 URL: https://issues.apache.org/jira/browse/FLINK-10885
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats, E2E Tests, 
Table API  SQL
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/zentol/flink/jobs/454943551

{code}
Waiting for schema registry...
[2018-11-14 12:20:59,394] ERROR Server died unexpectedly:  
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
org.apache.kafka.common.config.ConfigException: No supported Kafka endpoints 
are configured. Either kafkastore.bootstrap.servers must have at least one 
endpoint matching kafkastore.security.protocol or broker endpoints loaded from 
ZooKeeper must have at least one endpoint matching kafkastore.security.protocol.
at 
io.confluent.kafka.schemaregistry.storage.KafkaStore.endpointsToBootstrapServers(KafkaStore.java:313)
at 
io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:130)
at 
io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:144)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
at io.confluent.rest.Application.createServer(Application.java:149)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-14 Thread wgcn (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-10884:
-
Labels: yarn  (was: )

> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.6.2
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Priority: Major
>  Labels: yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-14 Thread wgcn (JIRA)
wgcn created FLINK-10884:


 Summary: Flink on yarn  TM container will be killed by nodemanager 
because of  the exceeded  physical memory.
 Key: FLINK-10884
 URL: https://issues.apache.org/jira/browse/FLINK-10884
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management, Core
Affects Versions: 1.6.2
 Environment: version  : 1.6.2 

module : flink on yarn

centos  jdk1.8

hadoop 2.7
Reporter: wgcn


TM container will be killed by nodemanager because of  the exceeded  
[physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
 memory. I found the lanuch context   lanuching TM container  that  "container 
memory =   heap memory+ offHeapSizeMB"  at the class 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
from line 160 to 166  I set a safety margin for the whole memory container 
using. For example  if the container  limit 3g  memory,  the sum memory that   
"heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container being 
killed.Do we have the 
[ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
 solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686532#comment-16686532
 ] 

Chesnay Schepler commented on FLINK-10864:
--

Personally I'd throw that one out in the long-run. The {{program-class}} 
thingies looks like some ancient legacy stuff we kept for backwards 
compatibility; but as far as i can tell it is entirely subsumed by 
{{Main-Class}}.

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10882) Misleading job/task state for scheduled jobs

2018-11-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10882:
-
Description: 
When submitting a job when not enough resources are available currently cuases 
the job  stay in a {{CREATE/SCHEDULED}} state.
There are 2 issues with how this is presented in the UI.

The {{Running Jobs}} page incorrectly states that the job is running.
(see list_view attachment)
EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
state.

The state display for individual tasks either
# States the task is in a CREATED state, when it is actually SCHEDULED
# States the task is in a CREATED state, but the count for all state boxes is 
zero.
(see task_view attachment)

  was:
When submitting a job when not enough resources are available currently cuases 
the job  stay in a {{CREATE/SCHEDULED}} state.
There are 2 issues with how this is presented in the UI.

The {{Running Jobs}} page incorrectly states that the job is running.
(see list_view attachment)

The state display for individual tasks either
# States the task is in a CREATED state, when it is actually SCHEDULED
# States the task is in a CREATED state, but the count for all state boxes is 
zero.
(see task_view attachment)


> Misleading job/task state for scheduled jobs
> 
>
> Key: FLINK-10882
> URL: https://issues.apache.org/jira/browse/FLINK-10882
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
> Attachments: list_view.png, task_view.png
>
>
> When submitting a job when not enough resources are available currently 
> cuases the job  stay in a {{CREATE/SCHEDULED}} state.
> There are 2 issues with how this is presented in the UI.
> The {{Running Jobs}} page incorrectly states that the job is running.
> (see list_view attachment)
> EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
> state.
> The state display for individual tasks either
> # States the task is in a CREATED state, when it is actually SCHEDULED
> # States the task is in a CREATED state, but the count for all state boxes is 
> zero.
> (see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686497#comment-16686497
 ] 

Chesnay Schepler commented on FLINK-10864:
--

When you suggest a separate _manifest_ entry i assumed you meant a custom 
manifest file that is packaged with the jar.

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-14 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686519#comment-16686519
 ] 

TisonKun commented on FLINK-10880:
--

Hi [~StephanEwen], I'd like to meet the consensus of where to document this 
message.

My opinion is the option description at 
{{JobManagerOptions#EXECUTION_FAILOVER_STRATEGY}} and Java Doc at 
{{RestartPipelinedRegionStrategy}}. What do you think? Are these enough?

> Failover strategies should not be applied to Batch Execution
> 
>
> Key: FLINK-10880
> URL: https://issues.apache.org/jira/browse/FLINK-10880
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.2
>Reporter: Stephan Ewen
>Assignee: TisonKun
>Priority: Blocker
> Fix For: 1.6.3, 1.7.0
>
>
> When configuring a failover strategy other than "full", DataSet/Batch 
> execution is currently not correct.
> This is expected, the failover region strategy is an experimental WIP feature 
> for streaming that has not been extended to the DataSet API.
> We need to document this and prevent execution of DataSet features with other 
> failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-14 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-10880:


Assignee: TisonKun

> Failover strategies should not be applied to Batch Execution
> 
>
> Key: FLINK-10880
> URL: https://issues.apache.org/jira/browse/FLINK-10880
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.2
>Reporter: Stephan Ewen
>Assignee: TisonKun
>Priority: Blocker
> Fix For: 1.6.3, 1.7.0
>
>
> When configuring a failover strategy other than "full", DataSet/Batch 
> execution is currently not correct.
> This is expected, the failover region strategy is an experimental WIP feature 
> for streaming that has not been extended to the DataSet API.
> We need to document this and prevent execution of DataSet features with other 
> failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10883:
-
Description: 
When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,614 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Pending slot 
request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out.
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission may time out is not documented, neither is which 
timeout is responsible in the first place nor how/whether this can be disabled.

  was:
When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission may time out is not documented, neither is which 
timeout is responsible in the first place nor how/whether this can be disabled.


> Submitting a jobs without enough slots times out due to a unspecified timeout
> -
>
> Key: FLINK-10883
> URL: https://issues.apache.org/jira/browse/FLINK-10883
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> When submitting a job without enough slots being available the job will stay 
> in a SCHEDULED/CREATED state. After some time (a few minutes) the job 
> execution will fail with the following timeout exception:
> {code}
> 2018-11-14 13:38:26,614 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Pending slot 
> request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out.
> 2018-11-14 13:38:26,615 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
> (1/$java.util.concurrent.TimeoutException
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 

[jira] [Updated] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10883:
-
Description: 
When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission may time out is not documented, neither is which 
timeout is responsible in the first place nor how/whether this can be disabled.

  was:
When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission will time out is not documented, neither is which 
timeout is responsible in the first place or how/whether this can be disabled.


> Submitting a jobs without enough slots times out due to a unspecified timeout
> -
>
> Key: FLINK-10883
> URL: https://issues.apache.org/jira/browse/FLINK-10883
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> When submitting a job without enough slots being available the job will stay 
> in a SCHEDULED/CREATED state. After some time (a few minutes) the job 
> execution will fail with the following timeout exception:
> {code}
> 2018-11-14 13:38:26,615 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
> (1/$java.util.concurrent.TimeoutException
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 

[jira] [Created] (FLINK-10882) Misleading job/task state for scheduled jobs

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10882:


 Summary: Misleading job/task state for scheduled jobs
 Key: FLINK-10882
 URL: https://issues.apache.org/jira/browse/FLINK-10882
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
 Attachments: list_view.png, task_view.png

When submitting a job when not enough resources are available currently cuases 
the job  stay in a {{CREATE/SCHEDULED}} state.
There are 2 issues with how this is presented in the UI.

The {{Running Jobs}} page incorrectly states that the job is running.
(see list_view attachment)

The state display for individual tasks either
# States the task is in a CREATED state, when it is actually SCHEDULED
# States the task is in a CREATED state, but the count for all state boxes is 
zero.
(see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10883:


 Summary: Submitting a jobs without enough slots times out due to a 
unspecified timeout
 Key: FLINK-10883
 URL: https://issues.apache.org/jira/browse/FLINK-10883
 Project: Flink
  Issue Type: Improvement
  Components: Job-Submission
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission will time out is not documented, neither is which 
timeout is responsible in the first place or how/whether this can be disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686505#comment-16686505
 ] 

Flavio Pompermaier commented on FLINK-10864:


No, I suggest to add an attribute to the Manifest.

Currently the PackagedProgram class only checks for the existence of 
program-class or Main-Class.

I was arguing that as Flink handles program-class (that is not standard) it 
could, quite easily, handle also a list of comma-separated classes in another 
custom attribute. It is a reasonable trade-off I think

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10869:
---
Labels: pull-request-available  (was: )

> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

2018-11-14 Thread GitBox
StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 
tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098
 
 
   ## What is the purpose of the change
   
   This updates the S3 credentials used for Integration Tests that interact 
with S3.
   The S3 bucket and account were provided by data Artisans and our permission 
setup was changed to a cleaner and more isolated model.
   
   ## Verifying this change
   
   Not possible on the pull request. Only the mast branch builds have access to 
the environment variables with S3 credentials, to prevent leaking the 
credentials.
   
   After merging, we have to check that the tests still execute the S3 
integration tests and do not skip them.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **yes**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10869) Update S3 testing settings

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686490#comment-16686490
 ] 

ASF GitHub Bot commented on FLINK-10869:


StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 
tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098
 
 
   ## What is the purpose of the change
   
   This updates the S3 credentials used for Integration Tests that interact 
with S3.
   The S3 bucket and account were provided by data Artisans and our permission 
setup was changed to a cleaner and more isolated model.
   
   ## Verifying this change
   
   Not possible on the pull request. Only the mast branch builds have access to 
the environment variables with S3 credentials, to prevent leaking the 
credentials.
   
   After merging, we have to check that the tests still execute the S3 
integration tests and do not skip them.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **yes**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update S3 testing settings
> --
>
> Key: FLINK-10869
> URL: https://issues.apache.org/jira/browse/FLINK-10869
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently S3 tests go against a bucket hosted by 'data Artisans'.
> As part of reworking the AWS permission setup, we need to adapt the 
> credentials and buckets for these tests.
> Future tests should refer to the following environment variables for S3 tests:
> * `IT_CASE_S3_BUCKET`
> * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA`
> * `IT_CASE_S3_SECRET_KEY`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10874:
---
Labels: pull-request-available  (was: )

> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> 

[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686473#comment-16686473
 ] 

ASF GitHub Bot commented on FLINK-10874:


pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document 
likely cause of UnknownTopicOrPartitionException
URL: https://github.com/apache/flink/pull/7097
 
 
   This is a change in documentation only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at 

[GitHub] pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document likely cause of UnknownTopicOrPartitionException

2018-11-14 Thread GitBox
pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document 
likely cause of UnknownTopicOrPartitionException
URL: https://github.com/apache/flink/pull/7097
 
 
   This is a change in documentation only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10881) SavepointITCase deadlocks on travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10881:


 Summary: SavepointITCase deadlocks on travis
 Key: FLINK-10881
 URL: https://issues.apache.org/jira/browse/FLINK-10881
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Tests
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/454898424



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686459#comment-16686459
 ] 

Flavio Pompermaier commented on FLINK-10864:


What about program-class attribute? Also that one is not standard but it is 
used by PackagedProgram..

 

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs

2018-11-14 Thread GitBox
twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs
URL: https://github.com/apache/flink/pull/7045#issuecomment-438646738
 
 
   The changes have been applied to `cron-master-e2e` and `cron-1.7-e2e`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr closed pull request #7045: [hotfix] Update nightly master cron jobs

2018-11-14 Thread GitBox
twalthr closed pull request #7045: [hotfix] Update nightly master cron jobs
URL: https://github.com/apache/flink/pull/7045
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index 3a5c6c9f858..cbd8dc0496c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -69,12 +69,12 @@ matrix:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-Dhadoop.version=2.8.3"
-  - SCRIPT="split_docker_e2e.sh"
+  - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-Dhadoop.version=2.8.3"
-  - SCRIPT="split_kubernetes_e2e.sh"
+  - SCRIPT="split_heavy.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
@@ -99,12 +99,12 @@ matrix:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-Dhadoop.version=2.8.3 -Dscala-2.12"
-  - SCRIPT="split_docker_e2e.sh"
+  - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-Dhadoop.version=2.8.3 -Dscala-2.12"
-  - SCRIPT="split_kubernetes_e2e.sh"
+  - SCRIPT="split_heavy.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
@@ -129,12 +129,12 @@ matrix:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-DwithoutHadoop"
-  - SCRIPT="split_docker_e2e.sh"
+  - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
   - PROFILE="-DwithoutHadoop"
-  - SCRIPT="split_kubernetes_e2e.sh"
+  - SCRIPT="split_heavy.sh"
 
 git:
   depth: 100
diff --git a/nightly.sh b/nightly.sh
index 19bc7b53287..c2d9ff6e1fc 100755
--- a/nightly.sh
+++ b/nightly.sh
@@ -44,7 +44,7 @@ LOG4J_PROPERTIES=${FLINK_DIR}/tools/log4j-travis.properties
 
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
 MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast"
-MVN_COMPILE_OPTIONS="-DskipTests"
+MVN_COMPILE_OPTIONS="-T1C -DskipTests"
 
 git clone --single-branch -b ${BRANCH} https://github.com/${REMOTE}/flink
 
diff --git a/splits/split_checkpoints.sh b/splits/split_checkpoints.sh
index 930b5a4bd98..5dfe7e34402 100755
--- a/splits/split_checkpoints.sh
+++ b/splits/split_checkpoints.sh
@@ -49,9 +49,12 @@ run_test "Resuming Savepoint (file, async, scale up) 
end-to-end test" "$END_TO_E
 run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
 run_test "Resuming Savepoint (file, async, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
 run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
-run_test "Resuming Savepoint (rocks, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
-run_test "Resuming Savepoint (rocks, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
+run_test "Resuming Savepoint (rocks, no parallelism change, heap timers) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
rocks false heap"
+run_test "Resuming Savepoint (rocks, scale up, heap timers) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false heap"
+run_test "Resuming Savepoint (rocks, scale down, heap timers) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false heap"
+run_test "Resuming Savepoint (rocks, no parallelism change, rocks timers) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
rocks false rocks"
+run_test "Resuming Savepoint (rocks, scale up, rocks timers) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false rocks"
+run_test "Resuming Savepoint (rocks, scale down, rocks timers) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false 
rocks"
 
 run_test "Resuming Externalized Checkpoint (file, async, no parallelism 
change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file 
true true"
 run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file 
false true"
diff --git a/splits/split_docker_e2e.sh b/splits/split_container.sh
similarity index 95%
rename from 

[jira] [Reopened] (FLINK-10856) Harden resume from externalized checkpoint E2E test

2018-11-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-10856:
--

The latest fix introduced a new instability where we attempt to resume from an 
incomplete checkpoint (i.e. one that has no {{_metadata_}} file).

We have to double-check that the checkpoint was actually completed.

https://travis-ci.org/zentol/flink/jobs/454943554

{code}
2018-11-14 12:11:45,071 ERROR 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Failed to 
submit job 9e7876f96c583177d183ce18b52bbd18.

java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager

at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager

at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)

at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)

at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)

... 7 more

Caused by: java.io.FileNotFoundException: Cannot find meta data file 
'_metadata' in directory 
'file:/home/travis/build/zentol/flink/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-14313753102/externalized-chckpt-e2e-backend-dir/dfcac75e697394900e5088f962130d57/chk-10'.
 Please try to load the checkpoint/savepoint directly from the metadata file 
instead of the directory.

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:256)

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:109)

at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1100)

at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1234)

at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1158)

at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)

at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)

... 10 more
{code}

> Harden resume from externalized checkpoint E2E test
> ---
>
> Key: FLINK-10856
> URL: https://issues.apache.org/jira/browse/FLINK-10856
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, State Backends, Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The resume from externalized checkpoints E2E test can fail due to 
> FLINK-10855. We should harden the test script to not expect a single 
> checkpoint directory being present but to take the checkpoint with the 
> highest checkpoint counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686446#comment-16686446
 ] 

ASF GitHub Bot commented on FLINK-10419:


dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway 
through RpcCheckpointResponder in…
URL: https://github.com/apache/flink/pull/7096
 
 
   Improved test to call `JobMasterGateway` through `RpcCheckpointResponder`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ClassNotFoundException while deserializing user exceptions from checkpointing
> -
>
> Key: FLINK-10419
> URL: https://issues.apache.org/jira/browse/FLINK-10419
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> It seems that somewhere in the operator's failure handling, we hand a 
> user-code exception to the checkpoint coordinator via Java serialization but 
> it will then fail during the de-serialization because the class is not 
> available. This will result in the following error shadowing the real one:
> {code}
> java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76)
> at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859)
> at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557)
> at java.lang.Throwable.readObject(Throwable.java:914)
> at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> at 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222)
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
> at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> at 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.deserializeMethodInvocation(RemoteRpcInvocation.java:118)
> at 
> 

[GitHub] dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in…

2018-11-14 Thread GitBox
dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway 
through RpcCheckpointResponder in…
URL: https://github.com/apache/flink/pull/7096
 
 
   Improved test to call `JobMasterGateway` through `RpcCheckpointResponder`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management

2018-11-14 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-10852.

   Resolution: Duplicate
Fix Version/s: (was: 1.8.0)

Not a bug in itself, rather a symptom of [FLINK-10880] 

> Decremented number of unfinished producers below 0. This is most likely a bug 
> in the execution state/intermediate result partition management
> -
>
> Key: FLINK-10852
> URL: https://issues.apache.org/jira/browse/FLINK-10852
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
>
>  
> {panel:title=Jobs using DataSet iteration operator, if set 
> jobmanager.execution.failover-strategy: region, will hang on FAILING state 
> when failover and has the following exception.}
> java.lang.IllegalStateException: Decremented number of unfinished producers 
> below 0. This is most likely a bug in the execution state/intermediate result 
> partition management. at 
> org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707)
>  at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568)
>  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
>  at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>  at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at 
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
> akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {panel}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management

2018-11-14 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686417#comment-16686417
 ] 

Stephan Ewen commented on FLINK-10852:
--

{{jobmanager.execution.failover-strategy}} is very much a "work in progress" 
internal feature at the moment.

It is actually not working correctly with the DataSet API in general, 
iterations being only one part.

So this is expected to not work, it is an unfinished feature.

I created [FLINK-10880] to handle this.

> Decremented number of unfinished producers below 0. This is most likely a bug 
> in the execution state/intermediate result partition management
> -
>
> Key: FLINK-10852
> URL: https://issues.apache.org/jira/browse/FLINK-10852
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Priority: Major
> Fix For: 1.8.0
>
>
>  
> {panel:title=Jobs using DataSet iteration operator, if set 
> jobmanager.execution.failover-strategy: region, will hang on FAILING state 
> when failover and has the following exception.}
> java.lang.IllegalStateException: Decremented number of unfinished producers 
> below 0. This is most likely a bug in the execution state/intermediate result 
> partition management. at 
> org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707)
>  at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568)
>  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
>  at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>  at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at 
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
> akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {panel}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-14 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-10880:


 Summary: Failover strategies should not be applied to Batch 
Execution
 Key: FLINK-10880
 URL: https://issues.apache.org/jira/browse/FLINK-10880
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.6.2
Reporter: Stephan Ewen
 Fix For: 1.6.3, 1.7.0


When configuring a failover strategy other than "full", DataSet/Batch execution 
is currently not correct.

This is expected, the failover region strategy is an experimental WIP feature 
for streaming that has not been extended to the DataSet API.

We need to document this and prevent execution of DataSet features with other 
failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686413#comment-16686413
 ] 

Chesnay Schepler commented on FLINK-10864:
--

I don't like the idea of introducing a separate manifest mechanism that is 
entirely decoupled from the standard manifest file. I believe this is way out 
of the scope of Flink (or really, anything outside the JDK) and will cause many 
headaches down the line.

Uploading multiple jars is a reasonable alternative IMO.

One idea I had at times was to actually allow multiple jars to be submitted at 
once; one containing the job and N for additional dependencies (kind of like a 
job-scoped /lib directory). This would make it easier to re-use dependencies 
across jobs. But then you're dealing with the issue of safely conveying that 
"to run this jar you also have to submit X".

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686407#comment-16686407
 ] 

Stephan Ewen commented on FLINK-10864:
--

I am skeptical about that, to be honest. This seems very ad-hoc, it is not 
common manifest entries, there is an arbitrary difference between a main class 
and other main classes. 

This feels like pushing functionality into Flink that belongs in 
operations/deployment tooling.

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10879) Align Flink clients on env.execute()

2018-11-14 Thread Flavio Pompermaier (JIRA)
Flavio Pompermaier created FLINK-10879:
--

 Summary: Align Flink clients on env.execute()
 Key: FLINK-10879
 URL: https://issues.apache.org/jira/browse/FLINK-10879
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.6.2
Reporter: Flavio Pompermaier


Right now the REST APIs do not support any code after env.execute while the 
Flink API, CLI client or the code executed within the IDE do.

Both clients should behave in the same way (supporting env.execute() to return 
something and continue the code execution or not).

See the discussion on the DEV ML for more details: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription

2018-11-14 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686391#comment-16686391
 ] 

Flavio Pompermaier commented on FLINK-10862:


What do you mean with "proper artifact management system"? Is there any 
suggestion about it?

In this case the Job Manager should be able to download (or use) the jar from 
such an external system (that at the moment is not possible if I'm not wrong..).

I think that with a simple enhancement to the REST API Flink will be just fine 
(at least for ordinary workloads), without the need to introduce another 
external component to the architecture..I would prefer an improvement in this 
direction with respect to a deprecation

> REST API does not show program descriptions of "simple" ProgramDescription
> --
>
> Key: FLINK-10862
> URL: https://issues.apache.org/jira/browse/FLINK-10862
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: rest_api
>
> When uploading a jar containing a main class implementing ProgramDescription 
> interface, the REST API doesn't list its description. It works only if the 
> class implements Program (that I find pretty useless...why should I return 
> the plan?)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686380#comment-16686380
 ] 

Stephan Ewen commented on FLINK-10867:
--

[~mcoimbra] This is an interesting idea.

As you describe, the work of a cache operator is somehow deeply interweaved 
with cluster setup, job and operator scheduling, etc. If we add this to the 
core APIs, people will expect it to work intuitively in all situations.

That would be a more significant effort, in scheduling an the DataSet API.
I would personally focus the effort on proper batch/streaming unification under 
the streaming APIs and operators, and look if we can get concepts of stream 
caching in there. This is however a longer term effort.

If you believe that this extension is significant for your use case and should 
be available today, what do you think about offering this as a 
"library/extension" in your own GitHub project?


> Add a DataSet-based CacheOperator to reuse results between jobs
> ---
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, DataSet API, Local Runtime
>Affects Versions: 1.8.0
>Reporter: Miguel E. Coimbra
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  ** Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> [~fhueske] last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with 

[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar

2018-11-14 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686368#comment-16686368
 ] 

Flavio Pompermaier commented on FLINK-10864:


I don't suggest to scan the entire jar, just to use a different manifest entry 
(like *Other-Main-Classes* or *Main-classes*). E.g:

*Main-Class:* org.apache.flink.WordCount
*Other-Main-Classes*: 
org.apache.flink.WordCount2,org.apache.flink.WordCount3,org.apache.flink.WordCount4

or

*Main-Classes:* 
org.apache.flink.WordCount,org.apache.flink.WordCount2,org.apache.flink.WordCount3,org.apache.flink.WordCount4

 

Then, the org.apache.flink.client.program.PackagedProgram class should just 
look for this extra tag and handle it.

> Support multiple Main classes per jar
> -
>
> Key: FLINK-10864
> URL: https://issues.apache.org/jira/browse/FLINK-10864
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now all the REST API and job submission system assumes that a jar 
> contains only a single main class. In my experience this is rarely the case 
> in real scenario: a jar contains multiple jobs (with similar dependencies) 
> that performs different tasks.
> In our use case, for example, the shaded jar is around 200 MB and 10 jobs 
> within it...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread Miguel E. Coimbra (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miguel E. Coimbra updated FLINK-10867:
--
Labels:   (was: pull-request-available)

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, DataSet API, Local Runtime
>Affects Versions: 1.8.0
>Reporter: Miguel E. Coimbra
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  ** Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> [~fhueske] last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
> finishes in local mode, so it would be necessary to change local mode to keep 
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet> previousResult = 
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet> sorted = op.sort(0); // the previous DataSet, 
> which resulted from callSomeFlinkDataflowOperator() int the previous Flink 
> job, remained in 

[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread Miguel E. Coimbra (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miguel E. Coimbra updated FLINK-10867:
--
Description: 
*Motivation.*

There are job scenarios where Flink batch processing users may be interested in 
processing a large amount of data, outputting results to disk and then reusing 
the results for another type of computation in Flink again.

This feature suggestion emerged from my work as a PhD researcher working on 
graph stream processing.

[https://arxiv.org/abs/1810.02781]

More specifically, in our use case this would be very useful to maintain an 
evolving graph while allowing for specific logic on challenges such as _when_ 
and _how_ to integrate updates in the graph and also how to represent it.

Furthermore, it would also be an enabler for rich use-cases that have synergy 
with this existing Jira issue pertaining graph partitioning:

FLINK-1536 - Graph partitioning operators for Gelly

*Problem.*

While it would be negligible to write the results to disk and then read them 
back in a new job to be sent to the JobManager if they are small, this becomes 
prohibitive if there are several gigabytes of data to write/read and using a 
distributed storage (e.g. HDFS) is not an option.

Even if there is a distributed storage available, as the number of sequential 
jobs increases, even the benefits of the secondary storage being distributed 
will diminish.

*Existing alternatives.*

I also considered, as a possibility, to compose the sequence of jobs in a 
single big job to submit to the JobManager, thus allowing reuse of results due 
to the natural forwarding of results to subsequent operators in dataflow 
programing.

However, this becomes difficult due to two reasons:
 * The logic to connect the sequence of jobs may depend on factors external to 
Flink and not known at the start of the job composition.
 This also excludes limited iterative behavior like what is provided in 
{{BulkIteration/DeltaIteration;}}
 ** Composing a job with "too many" operators and inter-dependencies may lead 
to the Optimizer engaging an exponential optimization search space.
 This is particularly true for operators with multiple valid execution 
strategies, leading to a combinatorics problem.
 This leads to the Flink compiler _taking forever_ to even create a plan.
 I believe this is the current situation based on a reply I received from 
[~fhueske] last year.
 His reply was on the 7th of December 2017:
 Link: 
[http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
 Mailing list thread title: "Re: How to perform efficient DataSet reuse between 
iterations"

 

*Idea.*

Perhaps the better way to describe this *CacheOperator* feature is the concept 
of "_job chaining_", where a new type of DataSink would receive data that will:
 - Be available to a subsequent job which somehow makes a reference to the 
DataSink of the previous job;
 - Have remained available (from the previous job execution) in the exact same 
TaskManagers in the cluster.

Likely, the optimal memory distribution will be pretty similar between chained 
jobs - if the data was read from disk again between jobs, it would likely be 
distributed with the same (automatic or not) strategies, hence the same 
distribution would likely be of use to sequential jobs.

*Design.*

Potential conflicts with the current Flink cluster execution model:
 - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
finishes in local mode, so it would be necessary to change local mode to keep a 
FlinkMiniCluster alive - what was the reasoning behind destroying it?
 Simplifying the implementation?

 - How would this look like in the API?
 I envisioned an example like this:

{{DataSet> previousResult = callSomeFlinkDataflowOperator(); 
// The result of some previous computation.}}
 {{CacheOperator>> op = previousResult.cache();}}
 {{... // Other operations...}}
 {{environment.execute();}}
 {{... // The previous job has finished.}}
 {{DataSet> sorted = op.sort(0); // the previous DataSet, 
which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, 
remained in memory.}}
 {{environment.execute(); // Trigger a different job whose data depends on the 
previous one.}}

Besides adding appropriate classes to the Flink Java API, implementing this 
feature would require changing things so that:
 * JobManagers are aware that a completed job had cached operators - likely a 
new COMPLETED_AND_REUSABLE job state?
 * TaskManagers must keep references to the Flink memory management segments 
associated to the CacheOperator data;
 * CacheOperator must have a default number of usages and/or amount of time to 
be kept alive (I think both options should exist but the user may choose 
whether to use one or both);
 * Cluster coordination: should the JobManager be 

[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread Miguel E. Coimbra (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miguel E. Coimbra updated FLINK-10867:
--
Description: 
*Motivation.*

There are job scenarios where Flink batch processing users may be interested in 
processing a large amount of data, outputting results to disk and then reusing 
the results for another type of computation in Flink again.

This feature suggestion emerged from my work as a PhD researcher working on 
graph stream processing.

[https://arxiv.org/abs/1810.02781]

More specifically, in our use case this would be very useful to maintain an 
evolving graph while allowing for specific logic on challenges such as _when_ 
and _how_ to integrate updates in the graph and also how to represent it.

Furthermore, it would also be an enabler for rich use-cases that have synergy 
with this existing Jira issue pertaining graph partitioning:

FLINK-1536 - Graph partitioning operators for Gelly

*Problem.*

While it would be negligible to write the results to disk and then read them 
back in a new job to be sent to the JobManager if they are small, this becomes 
prohibitive if there are several gigabytes of data to write/read and using a 
distributed storage (e.g. HDFS) is not an option.

Even if there is a distributed storage available, as the number of sequential 
jobs increases, even the benefits of the secondary storage being distributed 
will diminish.

*Existing alternatives.*

I also considered, as a possibility, to compose the sequence of jobs in a 
single big job to submit to the JobManager, thus allowing reuse of results due 
to the natural forwarding of results to subsequent operators in dataflow 
programing.

However, this becomes difficult due to two reasons:
 * The logic to connect the sequence of jobs may depend on factors external to 
Flink and not known at the start of the job composition.
 This also excludes limited iterative behavior like what is provided in 
{{BulkIteration/DeltaIteration;}}
 ** Composing a job with "too many" operators and inter-dependencies may lead 
to the Optimizer engaging an exponential optimization search space.
 This is particularly true for operators with multiple valid execution 
strategies, leading to a combinatorics problem.
 This leads to the Flink compiler _taking forever_ to even create a plan.
 I believe this is the current situation based on a reply I received from 
[~fhueske] last year.
 His reply was on the 7th of December 2017:
 Link: 
[http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
 Mailing list thread title: "Re: How to perform efficient DataSet reuse between 
iterations"

 

*Idea.*

Perhaps the better way to describe this *CachingOperator* feature is the 
concept of "_job chaining_", where a new type of DataSink would receive data 
that will:
 - Be available to a subsequent job which somehow makes a reference to the 
DataSink of the previous job;
 - Have remained available (from the previous job execution) in the exact same 
TaskManagers in the cluster.

Likely, the optimal memory distribution will be pretty similar between chained 
jobs - if the data was read from disk again between jobs, it would likely be 
distributed with the same (automatic or not) strategies, hence the same 
distribution would likely be of use to sequential jobs.

*Design.*

Potential conflicts with the current Flink cluster execution model:
 - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
finishes in local mode, so it would be necessary to change local mode to keep a 
FlinkMiniCluster alive - what was the reasoning behind destroying it?
 Simplifying the implementation?

 - How would this look like in the API?
 I envisioned an example like this:

{{DataSet> previousResult = callSomeFlinkDataflowOperator(); 
// The result of some previous computation.}}
 {{CacheOperator>> op = previousResult.cache();}}
 {{... // Other operations...}}
 {{environment.execute();}}
 {{... // The previous job has finished.}}
 {{DataSet> sorted = op.sort(0); // the previous DataSet, 
which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, 
remained in memory.}}
 {{environment.execute(); // Trigger a different job whose data depends on the 
previous one.}}

Besides adding appropriate classes to the Flink Java API, implementing this 
feature would require changing things so that:
 * JobManagers are aware that a completed job had cached operators - likely a 
new COMPLETED_AND_REUSABLE job state?
 * TaskManagers must keep references to the Flink memory management segments 
associated to the CacheOperator data;
 * CacheOperator must have a default number of usages and/or amount of time to 
be kept alive (I think both options should exist but the user may choose 
whether to use one or both);
 * Cluster coordination: should the JobManager 

[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686361#comment-16686361
 ] 

Piotr Nowojski commented on FLINK-10874:


Google search for this error suggests that usually this error happens around 
broker restarting and re-electing the leaders. Maybe this is some kind of an 
after shock of restarting a broker from another test.

Maybe this will be fixed by: https://issues.apache.org/jira/browse/FLINK-10838

> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> 

[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686331#comment-16686331
 ] 

ASF GitHub Bot commented on FLINK-10867:


zentol opened a new pull request #7095: [FLINK-10867][metrics] Cache logical 
scopes separately for each reporter
URL: https://github.com/apache/flink/pull/7095
 
 
   ## What is the purpose of the change
   
   This PR fixes an issue in the metric system where the logical scope was not 
cached separately for each reporter. As a result other reporters might access 
logical scopes for which the wrong filter and/or delimiter was used.
   
   ## Brief change log
   
   * cache logicalScopeStrings separately each reporter
   * modify `FrontMetricGroup` to inject reporterIndex when computing logical 
scope
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   * run `AbstractMetricGroupTest#testLogicalScopeCachingForMultipleReporters`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a DataSet-based CacheOperator to reuse results between jobs
> ---
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, DataSet API, Local Runtime
>Affects Versions: 1.8.0
>Reporter: Miguel E. Coimbra
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  * Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> Fabian Hueske last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CachingOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the 

[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686346#comment-16686346
 ] 

Chesnay Schepler commented on FLINK-10867:
--

[~yanghua] Yes, thanks! It should point to FLINK-10857 instead.

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, DataSet API, Local Runtime
>Affects Versions: 1.8.0
>Reporter: Miguel E. Coimbra
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  * Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> Fabian Hueske last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CachingOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
> finishes in local mode, so it would be necessary to change local mode to keep 
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet> previousResult = 
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet> sorted = op.sort(0); // the previous 

[GitHub] twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs

2018-11-14 Thread GitBox
twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs
URL: https://github.com/apache/flink/pull/7045#issuecomment-438621835
 
 
   Thanks @zentol. I will remove the kubernetes test and merge this. Once 
Travis gives green light.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686356#comment-16686356
 ] 

ASF GitHub Bot commented on FLINK-10419:


tillrohrmann commented on a change in pull request #7093:  [FLINK-10419] Using 
DeclineCheckpoint message class when invoking RPC declineCheckpoint
URL: https://github.com/apache/flink/pull/7093#discussion_r233398912
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -248,6 +256,75 @@ public static void teardownClass() {
}
}
 
+   @Test
+   public void testDeclineCheckpointInvocationWithUserException() throws 
Exception {
+   RpcService rpcService1 = null;
+   RpcService rpcService2 = null;
+   try {
+   final ActorSystem actorSystem1 = 
AkkaUtils.createDefaultActorSystem();
+   final ActorSystem actorSystem2 = 
AkkaUtils.createDefaultActorSystem();
+
+   rpcService1 = new AkkaRpcService(actorSystem1, 
testingTimeout);
+   rpcService2 = new AkkaRpcService(actorSystem2, 
testingTimeout);
+
+   final CompletableFuture 
declineCheckpointMessageFuture = new CompletableFuture<>();
+
+   final JobManagerSharedServices jobManagerSharedServices 
= new TestingJobManagerSharedServicesBuilder().build();
+   final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
+   final JobMaster jobMaster = new JobMaster(
+   rpcService1,
+   jobMasterConfiguration,
+   jmResourceId,
+   jobGraph,
+   haServices,
+   
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
+   jobManagerSharedServices,
+   heartbeatServices,
+   blobServer,
+   
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+   new NoOpOnCompletionActions(),
+   testingFatalErrorHandler,
+   JobMasterTest.class.getClassLoader()) {
+   @Override
+   public void declineCheckpoint(DeclineCheckpoint 
declineCheckpoint) {
+   
declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
+   }
+   };
+
+   jobMaster.start(jobMasterId, testingTimeout).get();
+
+   final String className = "UserException";
+   final URLClassLoader userClassLoader = 
ClassLoaderUtils.compileAndLoadJava(
+   temporaryFolder.newFolder(),
+   className + ".java",
+   String.format("public class %s extends 
RuntimeException { public %s() {super(\"UserMessage\");} }",
+   className,
+   className));
+
+   Throwable userException = (Throwable) 
Class.forName(className, false, userClassLoader).newInstance();
+
+   CompletableFuture jobMasterGateway =
+   rpcService2.connect(jobMaster.getAddress(), 
jobMaster.getFencingToken(), JobMasterGateway.class);
+
+   jobMasterGateway.thenAccept(gateway -> {
+   gateway.declineCheckpoint(new DeclineCheckpoint(
 
 Review comment:
   Could we actually instantiate a `RpcCheckpointResponder` instead of calling 
directly on the gateway. I think this is also part of the code which should be 
tested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ClassNotFoundException while deserializing user exceptions from checkpointing
> -
>
> Key: FLINK-10419
> URL: https://issues.apache.org/jira/browse/FLINK-10419
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0
>Reporter: Nico Kruber
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: 

[GitHub] tillrohrmann commented on a change in pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint

2018-11-14 Thread GitBox
tillrohrmann commented on a change in pull request #7093:  [FLINK-10419] Using 
DeclineCheckpoint message class when invoking RPC declineCheckpoint
URL: https://github.com/apache/flink/pull/7093#discussion_r233398912
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -248,6 +256,75 @@ public static void teardownClass() {
}
}
 
+   @Test
+   public void testDeclineCheckpointInvocationWithUserException() throws 
Exception {
+   RpcService rpcService1 = null;
+   RpcService rpcService2 = null;
+   try {
+   final ActorSystem actorSystem1 = 
AkkaUtils.createDefaultActorSystem();
+   final ActorSystem actorSystem2 = 
AkkaUtils.createDefaultActorSystem();
+
+   rpcService1 = new AkkaRpcService(actorSystem1, 
testingTimeout);
+   rpcService2 = new AkkaRpcService(actorSystem2, 
testingTimeout);
+
+   final CompletableFuture 
declineCheckpointMessageFuture = new CompletableFuture<>();
+
+   final JobManagerSharedServices jobManagerSharedServices 
= new TestingJobManagerSharedServicesBuilder().build();
+   final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
+   final JobMaster jobMaster = new JobMaster(
+   rpcService1,
+   jobMasterConfiguration,
+   jmResourceId,
+   jobGraph,
+   haServices,
+   
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
+   jobManagerSharedServices,
+   heartbeatServices,
+   blobServer,
+   
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+   new NoOpOnCompletionActions(),
+   testingFatalErrorHandler,
+   JobMasterTest.class.getClassLoader()) {
+   @Override
+   public void declineCheckpoint(DeclineCheckpoint 
declineCheckpoint) {
+   
declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
+   }
+   };
+
+   jobMaster.start(jobMasterId, testingTimeout).get();
+
+   final String className = "UserException";
+   final URLClassLoader userClassLoader = 
ClassLoaderUtils.compileAndLoadJava(
+   temporaryFolder.newFolder(),
+   className + ".java",
+   String.format("public class %s extends 
RuntimeException { public %s() {super(\"UserMessage\");} }",
+   className,
+   className));
+
+   Throwable userException = (Throwable) 
Class.forName(className, false, userClassLoader).newInstance();
+
+   CompletableFuture jobMasterGateway =
+   rpcService2.connect(jobMaster.getAddress(), 
jobMaster.getFencingToken(), JobMasterGateway.class);
+
+   jobMasterGateway.thenAccept(gateway -> {
+   gateway.declineCheckpoint(new DeclineCheckpoint(
 
 Review comment:
   Could we actually instantiate a `RpcCheckpointResponder` instead of calling 
directly on the gateway. I think this is also part of the code which should be 
tested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686347#comment-16686347
 ] 

ASF GitHub Bot commented on FLINK-10419:


dawidwys closed pull request #7093:  [FLINK-10419] Using DeclineCheckpoint 
message class when invoking RPC declineCheckpoint
URL: https://github.com/apache/flink/pull/7093
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 22244f6cb8d..b8dc5545706 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 public interface CheckpointCoordinatorGateway extends RpcGateway {
@@ -31,9 +32,5 @@ void acknowledgeCheckpoint(
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot subtaskState);
 
-   void declineCheckpoint(
-   JobID jobID,
-   ExecutionAttemptID executionAttemptID,
-   long checkpointId,
-   Throwable cause);
+   void declineCheckpoint(DeclineCheckpoint declineCheckpoint);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5d2d363cf71..40a675aca31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -690,13 +690,7 @@ public void acknowledgeCheckpoint(
 
// TODO: This method needs a leader session ID
@Override
-   public void declineCheckpoint(
-   final JobID jobID,
-   final ExecutionAttemptID executionAttemptID,
-   final long checkpointID,
-   final Throwable reason) {
-   final DeclineCheckpoint decline = new DeclineCheckpoint(
-   jobID, executionAttemptID, checkpointID, 
reason);
+   public void declineCheckpoint(DeclineCheckpoint decline) {
final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
 
if (checkpointCoordinator != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index c90a8b5bbbc..2f656d09263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService 
rpcService, Time timeout) thro
rpcService.stopService().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
}
 
+   /**
+* Shuts the given rpc services down and waits for their termination.
+*
+* @param rpcServices to shut down
+* @param timeout for this operation
+* @throws InterruptedException if the operation has been interrupted
+* @throws ExecutionException if a problem occurred
+* @throws TimeoutException if a timeout occurred
+*/
+   public static void terminateRpcServices(
+   Time timeout,
+   RpcService... rpcServices) throws InterruptedException, 
ExecutionException, TimeoutException {
+   final Collection> terminationFutures = new 
ArrayList<>(rpcServices.length);
+
+   for (RpcService service : rpcServices) {
+   if (service != null) {
+   terminationFutures.add(service.stopService());
+  

[GitHub] dawidwys closed pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint

2018-11-14 Thread GitBox
dawidwys closed pull request #7093:  [FLINK-10419] Using DeclineCheckpoint 
message class when invoking RPC declineCheckpoint
URL: https://github.com/apache/flink/pull/7093
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 22244f6cb8d..b8dc5545706 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 public interface CheckpointCoordinatorGateway extends RpcGateway {
@@ -31,9 +32,5 @@ void acknowledgeCheckpoint(
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot subtaskState);
 
-   void declineCheckpoint(
-   JobID jobID,
-   ExecutionAttemptID executionAttemptID,
-   long checkpointId,
-   Throwable cause);
+   void declineCheckpoint(DeclineCheckpoint declineCheckpoint);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5d2d363cf71..40a675aca31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -690,13 +690,7 @@ public void acknowledgeCheckpoint(
 
// TODO: This method needs a leader session ID
@Override
-   public void declineCheckpoint(
-   final JobID jobID,
-   final ExecutionAttemptID executionAttemptID,
-   final long checkpointID,
-   final Throwable reason) {
-   final DeclineCheckpoint decline = new DeclineCheckpoint(
-   jobID, executionAttemptID, checkpointID, 
reason);
+   public void declineCheckpoint(DeclineCheckpoint decline) {
final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
 
if (checkpointCoordinator != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index c90a8b5bbbc..2f656d09263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService 
rpcService, Time timeout) thro
rpcService.stopService().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
}
 
+   /**
+* Shuts the given rpc services down and waits for their termination.
+*
+* @param rpcServices to shut down
+* @param timeout for this operation
+* @throws InterruptedException if the operation has been interrupted
+* @throws ExecutionException if a problem occurred
+* @throws TimeoutException if a timeout occurred
+*/
+   public static void terminateRpcServices(
+   Time timeout,
+   RpcService... rpcServices) throws InterruptedException, 
ExecutionException, TimeoutException {
+   final Collection> terminationFutures = new 
ArrayList<>(rpcServices.length);
+
+   for (RpcService service : rpcServices) {
+   if (service != null) {
+   terminationFutures.add(service.stopService());
+   }
+   }
+
+   
FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+   }
+
// We don't want this class to be instantiable
private 

[jira] [Created] (FLINK-10878) State evolution E2E test failed on travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10878:


 Summary: State evolution E2E test failed on travis
 Key: FLINK-10878
 URL: https://issues.apache.org/jira/browse/FLINK-10878
 Project: Flink
  Issue Type: Bug
  Components: E2E Tests, State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/builds/454402843

{code}
2018-11-13 13:21:14,096 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
checkpoint 66 by task c375b648d817e2a1f25ab786094bfaac of job 
81af79ed99a57065b3f0cbeb3cd42b2b.
2018-11-13 13:21:14,097 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

{code}
2018-11-13 13:21:14,035 INFO  
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder  - Declining 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-11-13 13:21:14,351 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing 
heap keyed state backend with stream factory.
2018-11-13 13:21:14,351 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing 
heap keyed state backend from snapshot.
2018-11-13 13:21:15,212 INFO  
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder  - Declining 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException:
 Task received cancellation from one of its inputs
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:404)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:304)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
2018-11-13 13:21:19,680 INFO  org.apache.flink.runt
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription

2018-11-14 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686343#comment-16686343
 ] 

Stephan Ewen commented on FLINK-10862:
--

The ProgramDescription is a bit of an artifact that we should probably 
deprecate.

It is a bit of a gimmick to have this in Flink. Proper artifact management 
systems that any bigger infrastructure uses has their own 
metadata/descriptions/versioning.


> REST API does not show program descriptions of "simple" ProgramDescription
> --
>
> Key: FLINK-10862
> URL: https://issues.apache.org/jira/browse/FLINK-10862
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: rest_api
>
> When uploading a jar containing a main class implementing ProgramDescription 
> interface, the REST API doesn't list its description. It works only if the 
> class implements Program (that I find pretty useless...why should I return 
> the plan?)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2018-11-14 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686345#comment-16686345
 ] 

vinoyang commented on FLINK-10867:
--

[~Zentol] Maybe the PR you submitted should not bind to this issue?

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, DataSet API, Local Runtime
>Affects Versions: 1.8.0
>Reporter: Miguel E. Coimbra
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  * Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> Fabian Hueske last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CachingOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
> finishes in local mode, so it would be necessary to change local mode to keep 
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet> previousResult = 
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet> sorted = op.sort(0); // the previous DataSet, 
> 

[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686342#comment-16686342
 ] 

ASF GitHub Bot commented on FLINK-10634:


zentol commented on issue #7056: [FLINK-10634][metrics][rest] Add metrics 
availability e2e test
URL: https://github.com/apache/flink/pull/7056#issuecomment-438618845
 
 
   > Also it is worth adding a line in the README for the Prometheus and this 
test that states that you can run them locally with mvn clean verify 
-De2e-metrics -DdistDir=YOUR_DIR
   
   I don't really want to add these instructions right now since they are quite 
disconnected from the actual test. How the profile is called is determined in 
the pom, what the distDir property is called in the `FlinkDistribution`. I'd 
like to avoid them becoming out-dated, i.e. simply wrong.
   
   I have some ideas on how to better document how to run them, so that if the 
test is not executed you get a message saying what was missing. E.g. "Test was 
skipped since property 'e2e-metrics' was not specified."
   This way you would simply _try_ running them and get the infos on how to do 
it right there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> End-to-end test: Metrics accessible via REST API
> 
>
> Key: FLINK-10634
> URL: https://issues.apache.org/jira/browse/FLINK-10634
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests, Metrics, REST
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> Verify that Flink's metrics can be accessed via the REST API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >