[jira] [Commented] (FLINK-9640) Checkpointing is aways aborted if any task has been finished

2018-06-28 Thread Hai Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527141#comment-16527141
 ] 

Hai Zhou commented on FLINK-9640:
-

CC [~StephanEwen], what do you think about this ticket?




> Checkpointing is aways aborted if any task has been finished
> 
>
> Key: FLINK-9640
> URL: https://issues.apache.org/jira/browse/FLINK-9640
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> steps to reproduce:
> 1. build a standalone flink cluster.
> 2. submit a test job like this below:
> {code:java}
> public class DemoJob {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.disableOperatorChaining();
> env.setParallelism(4);
> env.enableCheckpointing(3000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> DataStream inputStream = env.addSource(new 
> StringGeneratorParallelSourceFunction());
> inputStream.map(String::hashCode).print();
> env.execute();
> }
> public static class StringGeneratorParallelSourceFunction extends 
> RichParallelSourceFunction {
> private static final Logger LOG = 
> LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
> private volatile boolean running = true;
> private int index;
> private int subtask_nums;
> @Override
> public void open(Configuration parameters) throws Exception {
> index = getRuntimeContext().getIndexOfThisSubtask();
> subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
> }
> @Override
> public void run(SourceContext ctx) throws Exception {
> while (running) {
> String data = UUID.randomUUID().toString();
> ctx.collect(data);
> LOG.info("subtask_index = {}, emit string = {}", index, data);
> Thread.sleep(50);
> if (index == subtask_nums / 2) {
> running = false;
> LOG.info("subtask_index = {}, finished.", index);
> }
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> }
> }
> {code}
> 3. observer jm and tm logs can be found.
> *taskmanager.log*
> {code:java}
> 2018-06-21 17:05:54,144 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
> 2018-06-21 17:05:54,151 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
> 2018-06-21 17:05:54,195 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, finished.
> 2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED.
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9).
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Ensuring all FileSystem streams are closed for task Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
> 2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Un-registering task and sending final execution state 
> FINISHED to JobManager for task Source: Custom Source 
> (6b2a374bec5f31112811613537dd4fd9)
> 2018-06-21 17:05:54,211 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
> {code}
> *jobmanager.log*
> {code:java}
> 2018-06-21 17:05:52,682 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
> (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:52,683 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) 
> (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:54,219 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING 
> to FINISHED.

[jira] [Issue Comment Deleted] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-06-28 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9091:
--
Comment: was deleted

(was: What should be the next step ?

Thanks)

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9236) Use Apache Parent POM 19

2018-06-28 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9236:
--
Description: 
Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.

This will also fix Javadoc generation with JDK 10+

  was:
Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.


This will also fix Javadoc generation with JDK 10+


> Use Apache Parent POM 19
> 
>
> Key: FLINK-9236
> URL: https://issues.apache.org/jira/browse/FLINK-9236
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Stephen Jason
>Priority: Major
>
> Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.
> This will also fix Javadoc generation with JDK 10+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9393) LocatableInputSplit#hashCode should take hostnames into account

2018-06-28 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-9393.
---
Resolution: Later

> LocatableInputSplit#hashCode should take hostnames into account
> ---
>
> Key: FLINK-9393
> URL: https://issues.apache.org/jira/browse/FLINK-9393
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>
> Currently:
> {code}
>   public int hashCode() {
> return this.splitNumber;
> {code}
> This is not symmetrical with {{equals}} method where hostnames are compared.
> LocatableInputSplit#hashCode should take hostnames into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9363) Bump up the Jackson version

2018-06-28 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9363:
--
Labels: security  (was: )

> Bump up the Jackson version
> ---
>
> Key: FLINK-9363
> URL: https://issues.apache.org/jira/browse/FLINK-9363
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: security
>
> CVE's for Jackson:
> CVE-2017-17485
> CVE-2018-5968
> CVE-2018-7489
> We can upgrade to 2.9.5



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7588) Document RocksDB tuning for spinning disks

2018-06-28 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309
 ] 

Ted Yu edited comment on FLINK-7588 at 6/29/18 2:56 AM:


bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable .


was (Author: yuzhih...@gmail.com):
bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable.

> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>Priority: Major
>  Labels: performance
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-28 Thread xueyumusic
Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199040249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

thanks for your review and advice, @fhueske , yes, At the beginning I had 
thought these two kinds of function signature, and chose the current one 
following the sql signature. I think your suggestions is reasonable, it follows 
the table api style and  makes the code concise. 
There are two problems that I found are: one is that `1.quarter` has been 
realized in other meaning (Quarter table api) and another is `1.week` seems to 
be not existed right now (maybe we can add).Thank you


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527093#comment-16527093
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199040249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

thanks for your review and advice, @fhueske , yes, At the beginning I had 
thought these two kinds of function signature, and chose the current one 
following the sql signature. I think your suggestions is reasonable, it follows 
the table api style and  makes the code concise. 
There are two problems that I found are: one is that `1.quarter` has been 
realized in other meaning (Quarter table api) and another is `1.week` seems to 
be not existed right now (maybe we can add).Thank you


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527083#comment-16527083
 ] 

ASF GitHub Bot commented on FLINK-9666:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6212
  
hi, @tillrohrmann , what does this mean? 
I don't understand, need I delete the branch?

![image](https://user-images.githubusercontent.com/20113411/42070083-75c684f2-7b87-11e8-96d8-575e11345eff.png)




> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6212: [FLINK-9666] short-circuit logic should be used in boolea...

2018-06-28 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6212
  
hi, @tillrohrmann , what does this mean? 
I don't understand, need I delete the branch?

![image](https://user-images.githubusercontent.com/20113411/42070083-75c684f2-7b87-11e8-96d8-575e11345eff.png)




---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527046#comment-16527046
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199036840
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -334,8 +335,11 @@ public void onContainersCompleted(final 
List list) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
-   
closeTaskManagerConnection(resourceId, new 
Exception(containerStatus.getDiagnostics()));
+   // check WorkerRegistration 
status to avoid requesting containers more than required
+   if 
(checkWorkerRegistrationWithResourceId(resourceId)) {
--- End diff --

Yes, I might happen. The problem is not as easy as I thought. The actual 
cause of this problem is the resource was released before a full restart but 
the onContainerCompleted callback method happened after the full restart. As 
the full restart will requesting all the containers needed as configured, if 
the onContainerCompleted  method was called after that, it will request for a 
new container and possess it which is not needed.


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread Clarkkkkk
Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199036840
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -334,8 +335,11 @@ public void onContainersCompleted(final 
List list) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
-   
closeTaskManagerConnection(resourceId, new 
Exception(containerStatus.getDiagnostics()));
+   // check WorkerRegistration 
status to avoid requesting containers more than required
+   if 
(checkWorkerRegistrationWithResourceId(resourceId)) {
--- End diff --

Yes, I might happen. The problem is not as easy as I thought. The actual 
cause of this problem is the resource was released before a full restart but 
the onContainerCompleted callback method happened after the full restart. As 
the full restart will requesting all the containers needed as configured, if 
the onContainerCompleted  method was called after that, it will request for a 
new container and possess it which is not needed.


---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527034#comment-16527034
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199035271
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
}};
}
+
+   @Test
+   public void testOnContainerCompleted() throws Exception {
+   new Context() {{
+   startResourceManager();
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   // Callback from YARN when container is allocated.
+   Container testingContainer = mock(Container.class);
+   when(testingContainer.getId()).thenReturn(
+   ContainerId.newInstance(
+   ApplicationAttemptId.newInstance(
+   
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+   1),
+   1));
+   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+   ImmutableList testingContainerList = 
ImmutableList.of(testingContainer);
+   
resourceManager.onContainersAllocated(testingContainerList);
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+   // Remote task executor registers with 
YarnResourceManager.
+   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
--- End diff --

Sure, I will modify it later.


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> 

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread Clarkkkkk
Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199035271
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
}};
}
+
+   @Test
+   public void testOnContainerCompleted() throws Exception {
+   new Context() {{
+   startResourceManager();
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   // Callback from YARN when container is allocated.
+   Container testingContainer = mock(Container.class);
+   when(testingContainer.getId()).thenReturn(
+   ContainerId.newInstance(
+   ApplicationAttemptId.newInstance(
+   
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+   1),
+   1));
+   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+   ImmutableList testingContainerList = 
ImmutableList.of(testingContainer);
+   
resourceManager.onContainersAllocated(testingContainerList);
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+   // Remote task executor registers with 
YarnResourceManager.
+   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
--- End diff --

Sure, I will modify it later.


---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527033#comment-16527033
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199035236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   // 

+   //  Work Registration status checking
+   // 

+
+   /**
+* Check if the executor with given resourceID is still in 
taskExecutors map
+* @param resourceID an ID mapping to a task executor
+* @return
+*/
+   protected boolean checkWorkerRegistrationWithResourceId(ResourceID 
resourceID) {
+   boolean status = taskExecutors.containsKey(resourceID);
+   if (!status) {
+   log.debug("No open TaskExecutor connection {}. Ignoring 
close TaskExecutor connection.", resourceID);
+   }
+   return status;
+   }
+
+   @VisibleForTesting
+   public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
--- End diff --

OK.


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread Clarkkkkk
Github user Clark commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r199035236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   // 

+   //  Work Registration status checking
+   // 

+
+   /**
+* Check if the executor with given resourceID is still in 
taskExecutors map
+* @param resourceID an ID mapping to a task executor
+* @return
+*/
+   protected boolean checkWorkerRegistrationWithResourceId(ResourceID 
resourceID) {
+   boolean status = taskExecutors.containsKey(resourceID);
+   if (!status) {
+   log.debug("No open TaskExecutor connection {}. Ignoring 
close TaskExecutor connection.", resourceID);
+   }
+   return status;
+   }
+
+   @VisibleForTesting
+   public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
--- End diff --

OK.


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527026#comment-16527026
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6132
  
@tillrohrmann Thanks for your review and good suggestions, changing the 
code according to it.


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

2018-06-28 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6132
  
@tillrohrmann Thanks for your review and good suggestions, changing the 
code according to it.


---


[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526910#comment-16526910
 ] 

ASF GitHub Bot commented on FLINK-9687:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
The failed info in travis error shows the test with `checkClusterEmpty` is 
wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated 
with this pull request.


> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When the window operator is fired by the event timer or processing timer, it 
> fetch the state content first. I think it only need to fetch the content from 
> windowState when the triggerResult is Fire. So we have to change the order to 
> avoid this cost ( the cost of fetch content from state is more than judge the 
> triggerResult). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9687:
--
Labels: pull-request-available  (was: )

> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When the window operator is fired by the event timer or processing timer, it 
> fetch the state content first. I think it only need to fetch the content from 
> windowState when the triggerResult is Fire. So we have to change the order to 
> avoid this cost ( the cost of fetch content from state is more than judge the 
> triggerResult). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...

2018-06-28 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6224
  
The failed info in travis error shows the test with `checkClusterEmpty` is 
wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated 
with this pull request.


---


[jira] [Created] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis

2018-06-28 Thread Lakshmi Rao (JIRA)
Lakshmi Rao created FLINK-9691:
--

 Summary: Modify run loop in Kinesis ShardConsumer to not sleep for 
a fixed fetchIntervalMillis
 Key: FLINK-9691
 URL: https://issues.apache.org/jira/browse/FLINK-9691
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Lakshmi Rao


Currently the ShardConsumer in the Kinesis connector sleeps for a fixed 
[fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210]
 resulting in the shard consumer sleeping for more time than necessary and not 
optimally reading from Kinesis. It should only be sleeping for 
(fetchIntervalMillis - time taken to process records) before making the 
subsequent getRecords call. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails

2018-06-28 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526709#comment-16526709
 ] 

Piotr Nowojski commented on FLINK-9690:
---

Yep, it looks so. Once I was trying to upgrade our connector to Kafka 1.0.0 and 
it required (very minor) changes in FlinkKafkaProducer. I have never committed 
it because there were also some failures in our consumer tests that I didn't 
have time to fix. From the stack trace it looks like there were even further 
changes in 1.1.0, don't know how big and easy to fix.

However what's the actual problem? We have never said that our connector 
supports using Kafka producers > 0.11.2.0.

> Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
> 
>
> Key: FLINK-9690
> URL: https://issues.apache.org/jira/browse/FLINK-9690
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Ufuk Celebi
>Priority: Major
>
> Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} 
> packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2.
> {code}
> java.lang.RuntimeException: Incompatible KafkaProducer version
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchFieldException: sequenceNumbers
> at java.lang.Class.getDeclaredField(Class.java:2070)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297)
> ... 16 more
> {code}
> [~pnowojski] Any ideas about this issue? Judging from the stack trace it was 
> anticipated that reflective access might break with Kafka versions > 0.11.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526671#comment-16526671
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198947650
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

It should be covered with `updateExpired` in `testExactExpirationOnWrite`


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198947650
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

It should be covered with `updateExpired` in `testExactExpirationOnWrite`


---


[jira] [Created] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails

2018-06-28 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-9690:
--

 Summary: Restoring state with FlinkKafkaProducer and Kafka 1.1.0 
client fails
 Key: FLINK-9690
 URL: https://issues.apache.org/jira/browse/FLINK-9690
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Ufuk Celebi


Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} packaged 
with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2.

{code}
java.lang.RuntimeException: Incompatible KafkaProducer version
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldException: sequenceNumbers
at java.lang.Class.getDeclaredField(Class.java:2070)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297)
... 16 more
{code}

[~pnowojski] Any ideas about this issue? Judging from the stack trace it was 
anticipated that reflective access might break with Kafka versions > 0.11.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-28 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r198928207
  
--- Diff: docs/dev/table/sql.md ---
@@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of 
supported SQL features in ba
 
 {% highlight sql %}
 
+insert:
+  INSERT INTO tableReference
--- End diff --

Move up to be sync with the similar Calcite's doc


---


[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526602#comment-16526602
 ] 

ASF GitHub Bot commented on FLINK-8650:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r198928207
  
--- Diff: docs/dev/table/sql.md ---
@@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of 
supported SQL features in ba
 
 {% highlight sql %}
 
+insert:
+  INSERT INTO tableReference
--- End diff --

Move up to be sync with the similar Calcite's doc


> Add tests and documentation for WINDOW clause
> -
>
> Key: FLINK-8650
> URL: https://issues.apache.org/jira/browse/FLINK-8650
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> We support queries with a {{WINDOW}} clause like:
> {code}
> SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY 
> a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)
> {code}
> But this is neither documented nor tested.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526601#comment-16526601
 ] 

ASF GitHub Bot commented on FLINK-8650:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r198928117
  
--- Diff: docs/dev/table/sql.md ---
@@ -139,7 +143,8 @@ select:
   [ WHERE booleanExpression ]
   [ GROUP BY { groupItem [, groupItem ]* } ]
   [ HAVING booleanExpression ]
-
+  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
--- End diff --

Done in a way Calcite does it


> Add tests and documentation for WINDOW clause
> -
>
> Key: FLINK-8650
> URL: https://issues.apache.org/jira/browse/FLINK-8650
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> We support queries with a {{WINDOW}} clause like:
> {code}
> SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY 
> a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)
> {code}
> But this is neither documented nor tested.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-28 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r198928117
  
--- Diff: docs/dev/table/sql.md ---
@@ -139,7 +143,8 @@ select:
   [ WHERE booleanExpression ]
   [ GROUP BY { groupItem [, groupItem ]* } ]
   [ HAVING booleanExpression ]
-
+  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
--- End diff --

Done in a way Calcite does it


---


[jira] [Updated] (FLINK-8650) Add tests and documentation for WINDOW clause

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8650:
--
Labels: pull-request-available  (was: )

> Add tests and documentation for WINDOW clause
> -
>
> Key: FLINK-8650
> URL: https://issues.apache.org/jira/browse/FLINK-8650
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> We support queries with a {{WINDOW}} clause like:
> {code}
> SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY 
> a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)
> {code}
> But this is neither documented nor tested.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526597#comment-16526597
 ] 

ASF GitHub Bot commented on FLINK-8650:
---

GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6226

[FLINK-8650] Tests for WINDOW clause and documentation update

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
test and documentation coverage of WINDOW clause


## Brief change log

  - *Test that the same queries but with different specification of windows 
have the same plan*
  - *Mentioning in doc WINDOW syntax* 


## Verifying this change

This change added tests and can be verified as follows:
via running of org.apache.flink.table.api.stream.sql.OverWindowTest

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_8560

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6226.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6226


commit 85451a2f9e6d1cd5d8b98fa2bba5c92326e817ce
Author: snuyanzin 
Date:   2018-06-28T16:19:25Z

[FLINK-8650] Tests for WINDOW clause and documentation update




> Add tests and documentation for WINDOW clause
> -
>
> Key: FLINK-8650
> URL: https://issues.apache.org/jira/browse/FLINK-8650
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> We support queries with a {{WINDOW}} clause like:
> {code}
> SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM 

[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-28 Thread snuyanzin
GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6226

[FLINK-8650] Tests for WINDOW clause and documentation update

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
test and documentation coverage of WINDOW clause


## Brief change log

  - *Test that the same queries but with different specification of windows 
have the same plan*
  - *Mentioning in doc WINDOW syntax* 


## Verifying this change

This change added tests and can be verified as follows:
via running of org.apache.flink.table.api.stream.sql.OverWindowTest

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_8560

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6226.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6226


commit 85451a2f9e6d1cd5d8b98fa2bba5c92326e817ce
Author: snuyanzin 
Date:   2018-06-28T16:19:25Z

[FLINK-8650] Tests for WINDOW clause and documentation update




---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526563#comment-16526563
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198914433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   // 

+   //  Work Registration status checking
+   // 

+
+   /**
+* Check if the executor with given resourceID is still in 
taskExecutors map
+* @param resourceID an ID mapping to a task executor
+* @return
+*/
+   protected boolean checkWorkerRegistrationWithResourceId(ResourceID 
resourceID) {
+   boolean status = taskExecutors.containsKey(resourceID);
+   if (!status) {
+   log.debug("No open TaskExecutor connection {}. Ignoring 
close TaskExecutor connection.", resourceID);
+   }
+   return status;
+   }
+
+   @VisibleForTesting
+   public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
--- End diff --

Let's not add this method which is only used for testing purposes to the 
production code. Instead, I would suggest to subclass `ResourceManager` in your 
test and add this method.


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526564#comment-16526564
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198915014
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
}};
}
+
+   @Test
+   public void testOnContainerCompleted() throws Exception {
+   new Context() {{
+   startResourceManager();
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   // Callback from YARN when container is allocated.
+   Container testingContainer = mock(Container.class);
+   when(testingContainer.getId()).thenReturn(
+   ContainerId.newInstance(
+   ApplicationAttemptId.newInstance(
+   
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+   1),
+   1));
+   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+   ImmutableList testingContainerList = 
ImmutableList.of(testingContainer);
+   
resourceManager.onContainersAllocated(testingContainerList);
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+   // Remote task executor registers with 
YarnResourceManager.
+   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
--- End diff --

Can we use the `TestingTaskExecutorGateway` here?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 

[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526565#comment-16526565
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198917467
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -334,8 +335,11 @@ public void onContainersCompleted(final 
List list) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
-   
closeTaskManagerConnection(resourceId, new 
Exception(containerStatus.getDiagnostics()));
+   // check WorkerRegistration 
status to avoid requesting containers more than required
+   if 
(checkWorkerRegistrationWithResourceId(resourceId)) {
--- End diff --

Wouldn't that prevent container restarts if the container failure happened 
before the `TaskManager` registered at the `ResourceManager`, because then, 
`ResourceManager#taskExecutors` would not contain the given `ResourceID`?


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198914433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   // 

+   //  Work Registration status checking
+   // 

+
+   /**
+* Check if the executor with given resourceID is still in 
taskExecutors map
+* @param resourceID an ID mapping to a task executor
+* @return
+*/
+   protected boolean checkWorkerRegistrationWithResourceId(ResourceID 
resourceID) {
+   boolean status = taskExecutors.containsKey(resourceID);
+   if (!status) {
+   log.debug("No open TaskExecutor connection {}. Ignoring 
close TaskExecutor connection.", resourceID);
+   }
+   return status;
+   }
+
+   @VisibleForTesting
+   public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
--- End diff --

Let's not add this method which is only used for testing purposes to the 
production code. Instead, I would suggest to subclass `ResourceManager` in your 
test and add this method.


---


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198915014
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
}};
}
+
+   @Test
+   public void testOnContainerCompleted() throws Exception {
+   new Context() {{
+   startResourceManager();
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   // Callback from YARN when container is allocated.
+   Container testingContainer = mock(Container.class);
+   when(testingContainer.getId()).thenReturn(
+   ContainerId.newInstance(
+   ApplicationAttemptId.newInstance(
+   
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+   1),
+   1));
+   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+   ImmutableList testingContainerList = 
ImmutableList.of(testingContainer);
+   
resourceManager.onContainersAllocated(testingContainerList);
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+   // Remote task executor registers with 
YarnResourceManager.
+   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
--- End diff --

Can we use the `TestingTaskExecutorGateway` here?


---


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6192#discussion_r198917467
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -334,8 +335,11 @@ public void onContainersCompleted(final 
List list) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
-   
closeTaskManagerConnection(resourceId, new 
Exception(containerStatus.getDiagnostics()));
+   // check WorkerRegistration 
status to avoid requesting containers more than required
+   if 
(checkWorkerRegistrationWithResourceId(resourceId)) {
--- End diff --

Wouldn't that prevent container restarts if the container failure happened 
before the `TaskManager` registered at the `ResourceManager`, because then, 
`ResourceManager#taskExecutors` would not contain the given `ResourceID`?


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526534#comment-16526534
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
+* @param cause of the task manager termination
+*/
+   void taskManagerTerminated(ResourceID resourceID, Set 
allocationIDs, Exception cause);
--- End diff --

methods should usually be a verb. What about `notifyTaskManagerTermination`?


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9456:
--
Labels: pull-request-available  (was: )

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526541#comment-16526541
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490029
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -45,6 +46,9 @@
/** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
 
+   /** Allocation id for which this slot has been allocated. */
+   private JobID jobId;
--- End diff --

Should be annotated with `@Nullable`


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526537#comment-16526537
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491884
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
--- End diff --

line break is missing here


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198910651
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
+* @param jobId to be notified
+* @param resourceID identifying the terminated task manager
+* @param allocationIDs of the job held that belong to this task manager
+* @param cause of the task manager termination.
+*/
+   void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, 
Set allocationIDs, Exception cause);
--- End diff --

I think the notification about a terminated `TaskManager` should not come 
from the `SlotManager` but from the `ResourceManager`. Thus, we should not need 
this method.


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
--- End diff --

I think this parameter is not needed.


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526540#comment-16526540
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198489897
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ---
@@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
startNewWorker(launched.profile());
}
 
-   closeTaskManagerConnection(id, new 
Exception(status.getMessage()));
+   final Exception terminatedCause = new 
Exception(status.getMessage());
--- End diff --

let's call it `terminationCause`


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9456:
--
Labels: pull-request-available  (was: )

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
 
+   @Override
+   public void taskManagerTerminated(ResourceID resourceID, 
Set allocationIds, Exception cause) {
--- End diff --

For what do we need the `allocationIds` parameter here?


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491683
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   protected void notifyTaskManagerCompleted(ResourceID resourceID, 
Exception cause) {
+   WorkerRegistration workerRegistration = 
taskExecutors.remove(resourceID);
+   if (workerRegistration != null) {
+   slotManager.notifyTaskManagerFailed(resourceID, 
workerRegistration.getInstanceID(), cause);
+   } else {
+   log.warn("TaskManager failed before registering with 
ResourceManager successfully.");
--- End diff --

This should be a debug log message.


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526535#comment-16526535
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
--- End diff --

I think this parameter is not needed.


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198493145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ---
@@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws 
Exception {
}
}
 
+   /**
+* Tests notify the job manager when the task manager is failed/killed.
+*/
+   @Test
+   public void testNotifyTaskManagerFailed() throws Exception {
+
+   final List, 
Exception>> notifiedTaskManagerInfos = new ArrayList<>();
+
+   try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+   @Override
+   public void notifyTaskManagerTerminated(JobID jobId, 
ResourceID resourceID, Set allocationIDs, Exception cause) {
+   notifiedTaskManagerInfos.add(new 
Tuple4<>(jobId, resourceID, allocationIDs, cause));
+   }
+   })) {
--- End diff --

Indentation looks a bit off here


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526538#comment-16526538
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198912464
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot 
taskManagerSlot, PendingSlotRequest pe
mainThreadExecutor);
}
 
+   public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID 
instanceID, Exception cause) {
+   final TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceID);
+   if (taskManagerRegistration != null) {
+   final HashMap> 
jobAndAllocationIDMap = new HashMap<>(4);
+   for (SlotID slotID : 
taskManagerRegistration.getSlots()) {
+   TaskManagerSlot taskManagerSlot = 
slots.get(slotID);
+   AllocationID allocationID = 
taskManagerSlot.getAllocationId();
+   if (allocationID != null) {
+   JobID jobId = 
taskManagerSlot.getJobId();
+   Set jobAllocationIDSet = 
jobAndAllocationIDMap.get(jobId);
+   if (jobAllocationIDSet == null) {
+   jobAllocationIDSet = new 
HashSet<>(2);
+   
jobAndAllocationIDMap.put(jobId, jobAllocationIDSet);
+   }
+   jobAllocationIDSet.add(allocationID);
+   }
+   }
+
+   for (Map.Entry> entry : 
jobAndAllocationIDMap.entrySet()) {
+   
resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, 
entry.getValue(), cause);
+   }
+   } else {
+   LOG.warn("TaskManager failed before registering with 
slot manager successfully.");
+   }
--- End diff --

This looks a little bit complicated. Moreover, I don't really like that the 
control flow is: ResourceManager -> SlotManager -> ResourceManager -> 
JobManager.

What about leveraging the existing `ResourceAction#notifyAllocationFailure` 
method. We could say that we not only call this method in case of a failed 
pending slot request but also if we remove a slot. Then unregistering a 
`TaskManager` from the `SlotManager` would remove the slots which then would 
trigger for each allocated slot the `notifyAllocationFailure` message. We would 
then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we 
can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation 
on the `JobMaster` side would then simply call `SlotPool#failAllocation`.

By doing it that way, we send multiple messages (might not be ideal) but we 
reuse most of the existing code paths without introducing special case logic. 
What do you think?


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198912464
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot 
taskManagerSlot, PendingSlotRequest pe
mainThreadExecutor);
}
 
+   public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID 
instanceID, Exception cause) {
+   final TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceID);
+   if (taskManagerRegistration != null) {
+   final HashMap> 
jobAndAllocationIDMap = new HashMap<>(4);
+   for (SlotID slotID : 
taskManagerRegistration.getSlots()) {
+   TaskManagerSlot taskManagerSlot = 
slots.get(slotID);
+   AllocationID allocationID = 
taskManagerSlot.getAllocationId();
+   if (allocationID != null) {
+   JobID jobId = 
taskManagerSlot.getJobId();
+   Set jobAllocationIDSet = 
jobAndAllocationIDMap.get(jobId);
+   if (jobAllocationIDSet == null) {
+   jobAllocationIDSet = new 
HashSet<>(2);
+   
jobAndAllocationIDMap.put(jobId, jobAllocationIDSet);
+   }
+   jobAllocationIDSet.add(allocationID);
+   }
+   }
+
+   for (Map.Entry> entry : 
jobAndAllocationIDMap.entrySet()) {
+   
resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, 
entry.getValue(), cause);
+   }
+   } else {
+   LOG.warn("TaskManager failed before registering with 
slot manager successfully.");
+   }
--- End diff --

This looks a little bit complicated. Moreover, I don't really like that the 
control flow is: ResourceManager -> SlotManager -> ResourceManager -> 
JobManager.

What about leveraging the existing `ResourceAction#notifyAllocationFailure` 
method. We could say that we not only call this method in case of a failed 
pending slot request but also if we remove a slot. Then unregistering a 
`TaskManager` from the `SlotManager` would remove the slots which then would 
trigger for each allocated slot the `notifyAllocationFailure` message. We would 
then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we 
can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation 
on the `JobMaster` side would then simply call `SlotPool#failAllocation`.

By doing it that way, we send multiple messages (might not be ideal) but we 
reuse most of the existing code paths without introducing special case logic. 
What do you think?


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526536#comment-16526536
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
 
+   @Override
+   public void taskManagerTerminated(ResourceID resourceID, 
Set allocationIds, Exception cause) {
--- End diff --

For what do we need the `allocationIds` parameter here?


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526542#comment-16526542
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491683
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   protected void notifyTaskManagerCompleted(ResourceID resourceID, 
Exception cause) {
+   WorkerRegistration workerRegistration = 
taskExecutors.remove(resourceID);
+   if (workerRegistration != null) {
+   slotManager.notifyTaskManagerFailed(resourceID, 
workerRegistration.getInstanceID(), cause);
+   } else {
+   log.warn("TaskManager failed before registering with 
ResourceManager successfully.");
--- End diff --

This should be a debug log message.


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491884
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
--- End diff --

line break is missing here


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490029
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -45,6 +46,9 @@
/** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
 
+   /** Allocation id for which this slot has been allocated. */
+   private JobID jobId;
--- End diff --

Should be annotated with `@Nullable`


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198489897
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ---
@@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
startNewWorker(launched.profile());
}
 
-   closeTaskManagerConnection(id, new 
Exception(status.getMessage()));
+   final Exception terminatedCause = new 
Exception(status.getMessage());
--- End diff --

let's call it `terminationCause`


---


[jira] [Created] (FLINK-9689) Flink consumer deserialization example

2018-06-28 Thread Satheesh (JIRA)
Satheesh created FLINK-9689:
---

 Summary: Flink consumer deserialization example
 Key: FLINK-9689
 URL: https://issues.apache.org/jira/browse/FLINK-9689
 Project: Flink
  Issue Type: Improvement
Reporter: Satheesh


Its hard to find relevant custom deserialization example for Flink Kafka 
consumer. It will be much useful to add a sample program for implementing 
custom deserialization in the blink-examples folder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526533#comment-16526533
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198910651
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
+* @param jobId to be notified
+* @param resourceID identifying the terminated task manager
+* @param allocationIDs of the job held that belong to this task manager
+* @param cause of the task manager termination.
+*/
+   void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, 
Set allocationIDs, Exception cause);
--- End diff --

I think the notification about a terminated `TaskManager` should not come 
from the `SlotManager` but from the `ResourceManager`. Thus, we should not need 
this method.


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
+* @param cause of the task manager termination
+*/
+   void taskManagerTerminated(ResourceID resourceID, Set 
allocationIDs, Exception cause);
--- End diff --

methods should usually be a verb. What about `notifyTaskManagerTermination`?


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526539#comment-16526539
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198493145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ---
@@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws 
Exception {
}
}
 
+   /**
+* Tests notify the job manager when the task manager is failed/killed.
+*/
+   @Test
+   public void testNotifyTaskManagerFailed() throws Exception {
+
+   final List, 
Exception>> notifiedTaskManagerInfos = new ArrayList<>();
+
+   try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+   @Override
+   public void notifyTaskManagerTerminated(JobID jobId, 
ResourceID resourceID, Set allocationIDs, Exception cause) {
+   notifiedTaskManagerInfos.add(new 
Tuple4<>(jobId, resourceID, allocationIDs, cause));
+   }
+   })) {
--- End diff --

Indentation looks a bit off here


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9672) Fail fatally if we cannot submit job on added JobGraph signal

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526528#comment-16526528
 ] 

ASF GitHub Bot commented on FLINK-9672:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6213#discussion_r198912529
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -322,9 +322,6 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
 
@Override
public CompletableFuture> listJobs(Time timeout) {
-   if (jobManagerRunners.isEmpty()) {
--- End diff --

nice


> Fail fatally if we cannot submit job on added JobGraph signal
> -
>
> Key: FLINK-9672
> URL: https://issues.apache.org/jira/browse/FLINK-9672
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{SubmittedJobGraphStore}} signals when new {{JobGraphs}} are added. If 
> this happens, then the leader should recover this job and submit it. If the 
> recovery/submission should fail for some reason, then we should fail fatally 
> to restart the process which will then try to recover the jobs again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6213: [FLINK-9672] Fail fatally if job submission fails ...

2018-06-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6213#discussion_r198912529
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -322,9 +322,6 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
 
@Override
public CompletableFuture> listJobs(Time timeout) {
-   if (jobManagerRunners.isEmpty()) {
--- End diff --

nice


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6083


---


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526502#comment-16526502
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6083


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-28 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8983.

   Resolution: Fixed
Fix Version/s: 1.6.0

Added via
a36b56999240d1ead0793be7acb4ad13cd0559f2
9b366045697f80534b8eb2e8b559f02d1452f0cf

> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8745) Reduce travis usage

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8745:

Affects Version/s: (was: 1.5.0)
   (was: 1.4.0)
   1.6.0

> Reduce travis usage
> ---
>
> Key: FLINK-8745
> URL: https://issues.apache.org/jira/browse/FLINK-8745
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Travis
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> We've been notified by INFRA that our travis usage is exceedingly high.
> There are various things we could look into short- and long term:
> h2. Short-term
> h3. Reduce number of jobs
> We currently run 12 job for each pr/push.
> The first 10 jobs belong to 2 groups, with each group representing one test 
> run of Flink against a specific hadoop version.
> Given that the majority of changes made to Flink do not impact our 
> compatibility with hadoop we could drop one of the groups and instead rely on 
> daily cron jobs. This alone would cut our travis usage by 40%.
> Once the migration to flip6 is done we can drop the remaining 2 jobs, 
> increasing the reduction to 60%.
> h3. Reduce number of builds
> Travis is run for every PR, regardless of what change was made, even if it 
> was something trivial as removing a trailing space in a documentation file. 
> From time to time it also happens that new commits are pushed in a PR solely 
> to trigger a new build to get that perfect green build.
> Instead we could look into manually triggering travis for pull requests, that 
> is with a bot.
> h2. Long-term
> h3. Incremental builds
> Making the build dependent on the changes made has been brought up a few 
> times now. This would in particular benefit cases where connectors/libraries 
> are modified as they generally have few dependents. We would still have to 
> run everything though if changes are made to the core modules.
> h3. Repository split
> The most painful of them all, but in my opinion also the most promising. With 
> separate repositories for the core flink modules (flink-runtime etc), 
> flink-connectors and flink-libraries would cut downright skip the compilation 
> for a large number of modules.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526454#comment-16526454
 ] 

Hequn Cheng commented on FLINK-9687:


Good catch. When isFire=false, there is no need to get contents from window 
state.

> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> When the window operator is fired by the event timer or processing timer, it 
> fetch the state content first. I think it only need to fetch the content from 
> windowState when the triggerResult is Fire. So we have to change the order to 
> avoid this cost ( the cost of fetch content from state is more than judge the 
> triggerResult). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9684) HistoryServerArchiveFetcher not working properly with secure hdfs cluster

2018-06-28 Thread Ethan Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526449#comment-16526449
 ] 

Ethan Li commented on FLINK-9684:
-

PR: https://github.com/apache/flink/pull/6225

> HistoryServerArchiveFetcher not working properly with secure hdfs cluster
> -
>
> Key: FLINK-9684
> URL: https://issues.apache.org/jira/browse/FLINK-9684
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Ethan Li
>Priority: Major
>
> With my current setup, jobmanager and taskmanager are able to talk to hdfs 
> cluster (with kerberos setup). However, running history server gets:
>  
>  
> {code:java}
> 2018-06-27 19:03:32,080 WARN org.apache.hadoop.ipc.Client - Exception 
> encountered while connecting to the server : 
> java.lang.IllegalArgumentException: Failed to specify server's Kerberos 
> principal name
> 2018-06-27 19:03:32,085 ERROR 
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher - 
> Failed to access job archive location for path 
> hdfs://openqe11blue-n2.blue.ygrid.yahoo.com/tmp/flink/openstorm10-blue/jmarchive.
> java.io.IOException: Failed on local exception: java.io.IOException: 
> java.lang.IllegalArgumentException: Failed to specify server's Kerberos 
> principal name; Host Details : local host is: 
> "openstorm10blue-n2.blue.ygrid.yahoo.com/10.215.79.35"; destination host is: 
> "openqe11blue-n2.blue.ygri
> d.yahoo.com":8020;
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
> at org.apache.hadoop.ipc.Client.call(Client.java:1414)
> at org.apache.hadoop.ipc.Client.call(Client.java:1363)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy9.getListing(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
> at com.sun.proxy.$Proxy9.getListing(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:515)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1726)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:650)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:146)
> at 
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:139)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.lang.IllegalArgumentException: Failed to 
> specify server's Kerberos principal name
> at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:677)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at 
> org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:640)
> at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
> at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
> at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> ... 

[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526330#comment-16526330
 ] 

aitozi commented on FLINK-9687:
---

Hi, [~kkl0u]

I have added my thought/description on this issue, can you help review it ?

> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> When the window operator is fired by the event timer or processing timer, it 
> fetch the state content first. I think it only need to fetch the content from 
> windowState when the triggerResult is Fire. So we have to change the order to 
> avoid this cost ( the cost of fetch content from state is more than judge the 
> triggerResult). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-9687:
--
Description: When the window operator is fired by the event timer or 
processing timer, it fetch the state content first. I think it only need to 
fetch the content from windowState when the triggerResult is Fire. So we have 
to change the order to avoid this cost ( the cost of fetch content from state 
is more than judge the triggerResult). 

> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> When the window operator is fired by the event timer or processing timer, it 
> fetch the state content first. I think it only need to fetch the content from 
> windowState when the triggerResult is Fire. So we have to change the order to 
> avoid this cost ( the cost of fetch content from state is more than judge the 
> triggerResult). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526286#comment-16526286
 ] 

Kostas Kloudas commented on FLINK-9687:
---

Hi [~aitozi].

Can you provide a description here on what do you mean and why this is 
interesting?

JIRAs should include the discussion on an issue, and not PRs. This is not only 
an issue of "taste" but it is required, as Jira is Apache, github is not.

> Delay the state fetch only when the triggerResult is fire
> -
>
> Key: FLINK-9687
> URL: https://issues.apache.org/jira/browse/FLINK-9687
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-06-28 Thread Renjie Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526247#comment-16526247
 ] 

Renjie Liu commented on FLINK-9662:
---

Would it be better if we reuse the existing job id field in  SlotStatus
class?

Till Rohrmann (JIRA)  于 2018年6月28日周四 下午7:29写道:

-- 
Liu, Renjie
Software Engineer, MVAD


> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: job isolation sequence.jpg
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-06-28 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9666.
--
Resolution: Fixed

Fixed via da37daa8ba387435abf6a3bc5629ce7d21a6b017

> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-06-28 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9666:


Assignee: lamber-ken

> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-06-28 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526224#comment-16526224
 ] 

Till Rohrmann commented on FLINK-9662:
--

I think you can still solve the problem of task manager isolation with the 
tags. You need simply to specify a predicate which strictly requires the job id 
tag to be present.

> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: job isolation sequence.jpg
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9624.
---
Resolution: Fixed

master: dd4c8469b11184b633d2b9514b9910622734270f

> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526175#comment-16526175
 ] 

Fabian Hueske commented on FLINK-9688:
--

We aim of course for a large set of built-in function. 
So such new features are highly welcome.
Thanks for creating this issue!

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Sergey Nuyanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526166#comment-16526166
 ] 

Sergey Nuyanzin commented on FLINK-9688:


ok I see, thank you for clarification

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526161#comment-16526161
 ] 

Fabian Hueske commented on FLINK-9688:
--

This is issue requests a new feature. The {{ATAN2}} is not support yet. Adding 
it is a new feature.
It would be a bug if the function was already supported but computing an 
incorrect result.

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9688:
-
Issue Type: New Feature  (was: Bug)

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Sergey Nuyanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-9688:
---
Issue Type: Bug  (was: New Feature)

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9688:
-
Priority: Minor  (was: Major)

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9688:
-
Issue Type: New Feature  (was: Bug)

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-06-28 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9682:
---

Assignee: vinoyang

> Add setDescription to execution environment and display it in the UI
> 
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Sergey Nuyanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-9688:
---
Description: 
simple query fails {code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
config());

DataSet> ds = 
CollectionDataSets.get3TupleDataSet(env);
tableEnv.registerDataSet("t1", ds, "x, y, z");

String sqlQuery = "SELECT atan2(1,2)";
Table result = tableEnv.sqlQuery(sqlQuery);
{code}
while at the same time Calcite supports it and in Calcite's sqlline it works 
like {noformat}
0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
+-+
| EXPR$0  |
+-+
| 0.4636476090008061 |
+-+
1 row selected (0.173 seconds)
{noformat}

  was:
simple query fails {code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

DataSet> ds = 
CollectionDataSets.get3TupleDataSet(env);
tableEnv.registerDataSet("t1", ds, "x, y, z");

String sqlQuery = "SELECT atan2(1,2)";
Table result = tableEnv.sqlQuery(sqlQuery);
{code}
while at the same time Calcite supports it and in Calcite's sqlline it works 
like {noformat}
0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
+-+
| EXPR$0  |
+-+
| 0.4636476090008061 |
+-+
1 row selected (0.173 seconds)
{noformat}


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9688) ATAN2 Sql Function support

2018-06-28 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9688:
--

 Summary: ATAN2 Sql Function support
 Key: FLINK-9688
 URL: https://issues.apache.org/jira/browse/FLINK-9688
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


simple query fails {code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

DataSet> ds = 
CollectionDataSets.get3TupleDataSet(env);
tableEnv.registerDataSet("t1", ds, "x, y, z");

String sqlQuery = "SELECT atan2(1,2)";
Table result = tableEnv.sqlQuery(sqlQuery);
{code}
while at the same time Calcite supports it and in Calcite's sqlline it works 
like {noformat}
0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
+-+
| EXPR$0  |
+-+
| 0.4636476090008061 |
+-+
1 row selected (0.173 seconds)
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9580.
---
   Resolution: Fixed
Fix Version/s: 1.5.1
   1.6.0

master: b4574c9bb5398713dd5501baf596f284ea19817f

1.5: 1bdc7194dff23b3ce3181c687be53f8a66a31bfe

> Potentially unclosed ByteBufInputStream in RestClient#readRawResponse
> -
>
> Key: FLINK-9580
> URL: https://issues.apache.org/jira/browse/FLINK-9580
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Here is related code:
> {code}
>   ByteBufInputStream in = new ByteBufInputStream(content);
>   byte[] data = new byte[in.available()];
>   in.readFully(data);
> {code}
> In the catch block, ByteBufInputStream is not closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9629.
---
   Resolution: Fixed
Fix Version/s: 1.5.1
   1.6.0

master: afcb513e21afaeab7289c0e51222c261d5d0150a

1.5: 0dae5a1aee771f17b086d6dbd54bf0b95bb436f2

> Datadog metrics reporter does not have shaded dependencies
> --
>
> Key: FLINK-9629
> URL: https://issues.apache.org/jira/browse/FLINK-9629
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Georgii Gobozov
>Assignee: Georgii Gobozov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for 
> okhttp3 and okio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9629:

Affects Version/s: (was: 1.5.1)

> Datadog metrics reporter does not have shaded dependencies
> --
>
> Key: FLINK-9629
> URL: https://issues.apache.org/jira/browse/FLINK-9629
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Georgii Gobozov
>Assignee: Georgii Gobozov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for 
> okhttp3 and okio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse

2018-06-28 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9580:

Affects Version/s: 1.6.0
   1.5.0

> Potentially unclosed ByteBufInputStream in RestClient#readRawResponse
> -
>
> Key: FLINK-9580
> URL: https://issues.apache.org/jira/browse/FLINK-9580
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Here is related code:
> {code}
>   ByteBufInputStream in = new ByteBufInputStream(content);
>   byte[] data = new byte[in.available()];
>   in.readFully(data);
> {code}
> In the catch block, ByteBufInputStream is not closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8785:
--
Labels: flip-6 pull-request-available  (was: flip-6)

> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526130#comment-16526130
 ] 

ASF GitHub Bot commented on FLINK-8785:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6222

[FLINK-8785][rest] Handle JobSubmissionExceptions

## What is the purpose of the change

This PR modifies the `JobSubmitHandler` to handle exceptions contained in 
the future returned by `DispatcherGateway#submitJob`.

An exception handler was added via `CompletableFuture#exceptionally` to 
return a proper `ErrorResponseBody` signaling that the job submission has 
failed.

This PR is pretty much the bare-bones solution; in the JIRA I advocated for 
including error messages from exceptions since there are various reasons why 
the submission could fail, but I can't find a satisfying solution.

## Verifying this change

* see new test in `JobSubmitHandlerTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8785_basic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6222


commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb
Author: zentol 
Date:   2018-06-28T08:57:01Z

[FLINK-8785][rest] Handle JobSubmissionExceptions




> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-06-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6222

[FLINK-8785][rest] Handle JobSubmissionExceptions

## What is the purpose of the change

This PR modifies the `JobSubmitHandler` to handle exceptions contained in 
the future returned by `DispatcherGateway#submitJob`.

An exception handler was added via `CompletableFuture#exceptionally` to 
return a proper `ErrorResponseBody` signaling that the job submission has 
failed.

This PR is pretty much the bare-bones solution; in the JIRA I advocated for 
including error messages from exceptions since there are various reasons why 
the submission could fail, but I can't find a satisfying solution.

## Verifying this change

* see new test in `JobSubmitHandlerTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8785_basic

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6222


commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb
Author: zentol 
Date:   2018-06-28T08:57:01Z

[FLINK-8785][rest] Handle JobSubmissionExceptions




---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526115#comment-16526115
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198757244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

As I understand, the wrapped states should already provide the default 
values. My idea was to wrap the original default value [in TTL 
factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174)
 with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good 
point about test cases for it, I will add them for appending states.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198757244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

As I understand, the wrapped states should already provide the default 
values. My idea was to wrap the original default value [in TTL 
factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174)
 with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good 
point about test cases for it, I will add them for appending states.


---


[jira] [Created] (FLINK-9687) Delay the state fetch only when the triggerResult is fire

2018-06-28 Thread aitozi (JIRA)
aitozi created FLINK-9687:
-

 Summary: Delay the state fetch only when the triggerResult is fire
 Key: FLINK-9687
 URL: https://issues.apache.org/jira/browse/FLINK-9687
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: aitozi
Assignee: aitozi






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526096#comment-16526096
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198751723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

As I understand, it depends on use case. If it is parallelizable, lazy 
operations over big collection like filter and map over lists, stream will give 
boost over loops but for short collections or non-parallelizable spliterators 
the overhead kills the performance. Though, it might be hard to predict the 
type of used spliterator. I agree the real benchmarking should be done to make 
sure.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL 

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198751723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

As I understand, it depends on use case. If it is parallelizable, lazy 
operations over big collection like filter and map over lists, stream will give 
boost over loops but for short collections or non-parallelizable spliterators 
the overhead kills the performance. Though, it might be hard to predict the 
type of used spliterator. I agree the real benchmarking should be done to make 
sure.


---


  1   2   >