[jira] [Created] (FLINK-16140) Translate "Event Processing (CEP)" page into Chinese

2020-02-18 Thread shuai.xu (Jira)
shuai.xu created FLINK-16140:


 Summary: Translate "Event Processing (CEP)" page into Chinese
 Key: FLINK-16140
 URL: https://issues.apache.org/jira/browse/FLINK-16140
 Project: Flink
  Issue Type: Task
  Components: chinese-translation, Documentation
Affects Versions: 1.10.0
Reporter: shuai.xu


Translate the internal page 
"[https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html]; 
into Chinese.

The doc located in "flink/docs/dev/libs/cep.zh.md"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16011) Normalize the within usage in Pattern

2020-02-11 Thread shuai.xu (Jira)
shuai.xu created FLINK-16011:


 Summary: Normalize the within usage in Pattern
 Key: FLINK-16011
 URL: https://issues.apache.org/jira/browse/FLINK-16011
 Project: Flink
  Issue Type: Improvement
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


In CEP, we can use Pattern.within() to set a window in which the pattern should 
be matched. However, the usage of within is ambiguous and confusing to user.

For example:
 # Pattern.begin("a").within(t1).followedBy("b").within(t2) will use the 
minimal of t1 and t2 as the window time for the whole pattern.
 # Pattern.begin("a").followedBy("b").within(t2) will use t2 as the window time.
 # But Pattern.begin("a").within(t1).followedBy("b") will have no window time
 # While 
Pattern.begin("a").notFollowedBy("not").within(t1).followedBy("b").within(t2) 
will use t2 as the window time.

So I propose to normalize the usage of within() and make strict checking when 
compiling the pattern. 

For example, we can only allow within() at the end of the pattern and point it 
out if user set it somewhere else when compiling the pattern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16010) Support notFollowedBy with interval as the last part of a Pattern

2020-02-11 Thread shuai.xu (Jira)
shuai.xu created FLINK-16010:


 Summary: Support notFollowedBy with interval as the last part of a 
Pattern
 Key: FLINK-16010
 URL: https://issues.apache.org/jira/browse/FLINK-16010
 Project: Flink
  Issue Type: New Feature
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


Now, Pattern.begin("a").notFollowedBy("b") is not allowed in CEP. But this a 
useful in many applications. Such as operators may want to find the users who 
created an order but didn't pay in 10 minutes.

So I propose to support that notFollowedBy() with a interval can be the last 
part of a Pattern. 

For example,  Pattern.begin("a").notFollowedBy("b").within(Time.minutes(10)) 
will be valid in the future.

Discuss in dev mail list is 
[https://lists.apache.org/thread.html/rc505728048663d672ad379578ac67d3219f6076986c80a2362802ebb%40%3Cdev.flink.apache.org%3E]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15980) The notFollowedBy in the end of GroupPattern may be ignored

2020-02-10 Thread shuai.xu (Jira)
shuai.xu created FLINK-15980:


 Summary: The notFollowedBy in the end of GroupPattern may be 
ignored
 Key: FLINK-15980
 URL: https://issues.apache.org/jira/browse/FLINK-15980
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


If we write a Pattern like this:
Pattern group = Pattern.begin('A').notFollowedBy("B");
Pattern pattern = Pattern.begin(group).followedBy("C");
Let notFollowedBy as the last part of a GroupPattern.

This pattern can be compile normally, but the notFollowedBy("B") doesn't work 
in fact.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15964) Getting previous stage in notFollowedBy may throw exception

2020-02-09 Thread shuai.xu (Jira)
shuai.xu created FLINK-15964:


 Summary: Getting previous stage in notFollowedBy may throw 
exception
 Key: FLINK-15964
 URL: https://issues.apache.org/jira/browse/FLINK-15964
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


In a notFollowedBy() condition, it may throw exception if trying to get value 
from previous stage for comparison.

For example:

Pattern pattern = Pattern.begin("start", 
AfterMatchSkipStrategy.skipPastLastEvent())
 .notFollowedBy("not").where(new IterativeCondition() {
 private static final long serialVersionUID = -4702359359303151881L;

 @Override
 public boolean filter(Event value, Context ctx) throws Exception {
 return 
value.getName().equals(ctx.getEventsForPattern("start").iterator().next().getName());
 }
 })
 .followedBy("middle").where(new IterativeCondition() {
 @Override
 public boolean filter(Event value, Context ctx) throws Exception {
 return value.getName().equals("b");
 }
 });

with inputs:
Event a = new Event(40, "a", 1.0);
Event b1 = new Event(41, "a", 2.0);
Event b2 = new Event(43, "b", 3.0);

It will throw 

org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
function.org.apache.flink.util.FlinkRuntimeException: Failure happened in 
filter function.
 at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:698) at 
org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:628) at 
org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292) at 
org.apache.flink.cep.nfa.NFA.process(NFA.java:228) at 
org.apache.flink.cep.utils.NFATestHarness.consumeRecord(NFATestHarness.java:107)
 at 
org.apache.flink.cep.utils.NFATestHarness.feedRecord(NFATestHarness.java:84) at 
org.apache.flink.cep.utils.NFATestHarness.feedRecords(NFATestHarness.java:77) 
at org.apache.flink.cep.nfa.NFAITCase.testEndWithNotFollow(NFAITCase.java:2914) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused 
by: java.util.NoSuchElementException at 
java.util.Collections$EmptyIterator.next(Collections.java:4189) at 
org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2884) at 
org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2879) at 
org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:752) at 
org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:688) ... 33 
more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15873) Matched result may not be output if existing earlier partial matches

2020-02-03 Thread shuai.xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029563#comment-17029563
 ] 

shuai.xu commented on FLINK-15873:
--

hi @[~dwysakowicz], I have submit a [pull 
request|[https://github.com/apache/flink/pull/11009]] to fix this issue, could 
you help to review it?

> Matched result may not be output if existing earlier partial matches
> 
>
> Key: FLINK-15873
> URL: https://issues.apache.org/jira/browse/FLINK-15873
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.9.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> When running some cep jobs with skip strategy, I found that when we get a 
> matched result, but if there is an earlier partial matches, the result will 
> not be returned.
> I think this is due to a bug in processMatchesAccordingToSkipStrategy() in 
> NFA class. It should return matched result without judging whether this is 
> partial matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15873) Matched result may not be output if existing earlier partial matches

2020-02-03 Thread shuai.xu (Jira)
shuai.xu created FLINK-15873:


 Summary: Matched result may not be output if existing earlier 
partial matches
 Key: FLINK-15873
 URL: https://issues.apache.org/jira/browse/FLINK-15873
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


When running some cep jobs with skip strategy, I found that when we get a 
matched result, but if there is an earlier partial matches, the result will not 
be returned.

I think this is due to a bug in processMatchesAccordingToSkipStrategy() in NFA 
class. It should return matched result without judging whether this is partial 
matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15627) Correct the wrong naming of compareMaps in NFATestUtilities

2020-01-17 Thread shuai.xu (Jira)
shuai.xu created FLINK-15627:


 Summary: Correct the wrong naming of compareMaps in 
NFATestUtilities
 Key: FLINK-15627
 URL: https://issues.apache.org/jira/browse/FLINK-15627
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.1
Reporter: shuai.xu


The compareMaps in NFATestUtilities compare two lists in fact, so rename it to 
compareLists may be better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12038) YARNITCase stalls on travis

2019-07-18 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888482#comment-16888482
 ] 

shuai.xu commented on FLINK-12038:
--

This failure can be easily re-produced in my local machine. I enabled the logs 
of YARN, and found the reason. You can find the log of unregisterAM in 
jobmanager.log. When the job is finished, it will try to unregisterAM to YARN. 
In fact, it is not necessary to call killApplication, as the whole YARN mini 
cluster will be closed in the tearDown of test case. 

The bellowing is part of logs of job master:

2019-07-16 18:20:34,376 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from SCHEDULED to DEPLOYING.
2019-07-16 18:20:34,376 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Custom Source (1/2) (attempt #0) to container_1563272405568_0001_01_02 @ 
e011239174096.et15sqa (dataPort=42072)
2019-07-16 18:20:34,404 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from SCHEDULED to DEPLOYING.
2019-07-16 18:20:34,405 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Custom Source (2/2) (attempt #0) to container_1563272405568_0001_01_03 @ 
e011239174096.et15sqa (dataPort=41793)
2019-07-16 18:20:34,405 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) 
(65db57ac7166e0a96a3c5318bb262fb0) switched from SCHEDULED to DEPLOYING.
2019-07-16 18:20:34,414 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Sink: 
Unnamed (1/2) (attempt #0) to container_1563272405568_0001_01_03 @ 
e011239174096.et15sqa (dataPort=41793)
2019-07-16 18:20:34,447 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) 
(22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from SCHEDULED to DEPLOYING.
2019-07-16 18:20:34,447 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Sink: 
Unnamed (2/2) (attempt #0) to container_1563272405568_0001_01_02 @ 
e011239174096.et15sqa (dataPort=42072)
2019-07-16 18:20:34,897 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from DEPLOYING to RUNNING.
2019-07-16 18:20:34,949 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from DEPLOYING to RUNNING.
2019-07-16 18:20:35,056 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) 
(65db57ac7166e0a96a3c5318bb262fb0) switched from DEPLOYING to RUNNING.
2019-07-16 18:20:35,067 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) 
(22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from DEPLOYING to RUNNING.
2019-07-16 18:20:35,450 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from RUNNING to FINISHED.
2019-07-16 18:20:35,480 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
(1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from RUNNING to FINISHED.
2019-07-16 18:20:35,494 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) 
(22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from RUNNING to FINISHED.
2019-07-16 18:20:35,508 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) 
(65db57ac7166e0a96a3c5318bb262fb0) switched from RUNNING to FINISHED.
2019-07-16 18:20:35,513 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming 
Job (2f9313ea4fd33bef68111ed380a2ae1b) switched from state RUNNING to FINISHED.
2019-07-16 18:20:35,513 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint 
coordinator for job 2f9313ea4fd33bef68111ed380a2ae1b.
2019-07-16 18:20:35,513 INFO 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - 
Shutting down
2019-07-16 18:20:35,564 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher 
- Job 2f9313ea4fd33bef68111ed380a2ae1b reached globally terminal state FINISHED.
2019-07-16 18:20:35,573 INFO org.apache.flink.runtime.jobmaster.JobMaster - 
Stopping the JobMaster for job Flink Streaming 
Job(2f9313ea4fd33bef68111ed380a2ae1b).
2019-07-16 18:20:35,664 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool.
2019-07-16 18:20:35,666 INFO org.apache.flink.runtime.jobmaster.JobMaster - 
Close ResourceManager connection 165d22977dc31b3b410489789fdc1050: JobManager 
is shutting down..
2019-07-16 18:20:35,668 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool.
2019-07-16 18:20:35,668 INFO org.apache.flink.yarn.YarnResourceManager - 
Disconnect job 

[jira] [Commented] (FLINK-12038) YARNITCase stalls on travis

2019-07-17 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16886925#comment-16886925
 ] 

shuai.xu commented on FLINK-12038:
--

[~till.rohrmann], I think this is caused by 
https://issues.apache.org/jira/browse/YARN-2853 of yarn 2.4. In the end of the 
case, it try to kill the application while YarnResourceManager will 
unregisterAM to yarn. I think we could fix it by adding a check for the 
application state before killing it, what do you think?

> YARNITCase stalls on travis
> ---
>
> Key: FLINK-12038
> URL: https://issues.apache.org/jira/browse/FLINK-12038
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Tests
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: shuai.xu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> https://travis-ci.org/apache/flink/jobs/511932978



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278
 ] 

shuai.xu edited comment on FLINK-12863 at 6/17/19 3:27 AM:
---

Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport may be have some defects. 
Considering how would you update the fencing token? When offering slots 
succeeds or before offering slots? If when offering slots succeeds, it may 
happen that JM use the new fencing token while TM considering the offering 
slots fail, so TM may not update the token, and JM have no change to use the 
old token any more. If TM updates the token before offering slots, it may 
happen that JM doesn't receive the offering, so JM doesn't update the token. I 
think using a version may be more suitable, as we can compare two version, the 
bigger version will be correct always.


was (Author: tiemsn):
Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport can solve it. But how would you 
update the fencing token? When offering slots succeeds or before offering 
slots? If when offering slots succeeds, it may happen that JM use the new 
fencing token while TM considering the offering slots fail, so TM may not 
update the token, and JM have no change to use the old token any more. If TM 
updates the token before offering slots, it may happen that JM doesn't receive 
the offering, so JM doesn't update the token. I think using a version may be 
more suitable, as we can compare two version, the bigger version will be 
correct always.

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278
 ] 

shuai.xu commented on FLINK-12863:
--

Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport can solve it. But how would you 
update the fencing token? When offering slots succeeds or before offering 
slots? If when offering slots succeeds, it may happen that JM use the new 
fencing token while TM considering the offering slots fail, so TM may not 
update the token, and JM have no change to use the old token any more. If TM 
updates the token before offering slots, it may happen that JM doesn't receive 
the offering, so JM doesn't update the token. I think using a version may be 
more suitable, as we can compare two version, the bigger version will be 
correct always.

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12760) Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-06 Thread shuai.xu (JIRA)
shuai.xu created FLINK-12760:


 Summary: Implement ExecutionGraph to InputsLocationsRetriever 
Adapter
 Key: FLINK-12760
 URL: https://issues.apache.org/jira/browse/FLINK-12760
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: shuai.xu
Assignee: shuai.xu
 Fix For: 1.9.0


Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which 
adapts the ExecutionGraph to the InputsLocationsRetriever interface.

*Acceptance criteria*
 * The adapter always reflects an up to date view of the ExecutionGraph state



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12372) Implement ExecutionSlotAllocator

2019-05-15 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840166#comment-16840166
 ] 

shuai.xu edited comment on FLINK-12372 at 5/16/19 2:56 AM:
---

Hi [~gjy]

For the ScheduleUnit, I think your proposal works.

For the getPreferredLocationsBasedOnInputs, I think we can put it in the 
implementation of ExecutionSlotAllocator. But this is an expediency, it need to 
be refined later.

And do we need to change the 

cancel(SlotExecutionVertexAssignment) in ExecutionSlotAllcator to 
cancel(ExecutionVertexID), as Scheduler may need to call this method when reset 
the ExeuctionVertex, but it doesn't know the SlotExecutionVertexAssignment, it 
only know ExeuctionVertexID.


was (Author: tiemsn):
Hi [~gjy]

For the ScheduleUnit, I think your proposal works.

For the getPreferredLocationsBasedOnInputs, I think we can put it in the 
implementation of ExecutionSlotAllocator. But this is an expediency, it need to 
be refined later.

> Implement ExecutionSlotAllocator
> 
>
> Key: FLINK-12372
> URL: https://issues.apache.org/jira/browse/FLINK-12372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: shuai.xu
>Priority: Major
>
> Add and implement {{ExecutionSlotAllocator}} interface
> Design document: 
> https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU
> *Acceptance criteria*
> * {{ExecutionSlotAllocator}} interface is defined and implemented
> * interface implementation is unit tested



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12372) Implement ExecutionSlotAllocator

2019-05-15 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840166#comment-16840166
 ] 

shuai.xu commented on FLINK-12372:
--

Hi [~gjy]

For the ScheduleUnit, I think your proposal works.

For the getPreferredLocationsBasedOnInputs, I think we can put it in the 
implementation of ExecutionSlotAllocator. But this is an expediency, it need to 
be refined later.

> Implement ExecutionSlotAllocator
> 
>
> Key: FLINK-12372
> URL: https://issues.apache.org/jira/browse/FLINK-12372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: shuai.xu
>Priority: Major
>
> Add and implement {{ExecutionSlotAllocator}} interface
> Design document: 
> https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU
> *Acceptance criteria*
> * {{ExecutionSlotAllocator}} interface is defined and implemented
> * interface implementation is unit tested



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12372) Implement ExecutionSlotAllocator

2019-05-13 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838492#comment-16838492
 ] 

shuai.xu commented on FLINK-12372:
--

I am now working on this issue. This is the implementation plan.
1. It will contain ExecutionSlotAllocator, DefaultExecutionSlotAllocator, 
ExecutionVertexSchedulingRequirements, SlotExecutionVertexAssignment and unit 
tests for DefaultExecutionSlotAllocator.
2. The DefaultExecutionSlotAllocator will use a SlotProvider to allocate slots 
and then return the SlotExecutionVertexAssignment to the caller. It will 
contain a Map to cache the 
unfulfilled slot requests.
There is one problem:
1. The SlotProvider needs ScheduledUnit when allocating slots, but  I think 
ExecutionSlotAllocator should get rid of Execution and ScheduleUnit, so It may 
need to create a new class extending the ScheduleUnit and use it in the 
ExecutionSlotAllocator.

[~gjy], do you have any suggestion?

> Implement ExecutionSlotAllocator
> 
>
> Key: FLINK-12372
> URL: https://issues.apache.org/jira/browse/FLINK-12372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: shuai.xu
>Priority: Major
>
> Add and implement {{ExecutionSlotAllocator}} interface
> Design document: 
> https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU
> *Acceptance criteria*
> * {{ExecutionSlotAllocator}} interface is defined and implemented
> * interface implementation is unit tested



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12228) Implement Eager Scheduling Strategy

2019-04-22 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-12228:


Assignee: shuai.xu  (was: Gary Yao)

> Implement Eager Scheduling Strategy
> ---
>
> Key: FLINK-12228
> URL: https://issues.apache.org/jira/browse/FLINK-12228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: shuai.xu
>Priority: Major
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.EAGER}}, i.e., all vertices are scheduled at once.
> Acceptance Criteria:
> * Test implementation of {{SchedulerOperations}} exists
> * Test implementation of {{AccessExecutionGraph}} exists
> * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12227) Introduce SchedulingStrategy interface

2019-04-21 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-12227:


Assignee: shuai.xu  (was: Gary Yao)

> Introduce SchedulingStrategy interface
> --
>
> Key: FLINK-12227
> URL: https://issues.apache.org/jira/browse/FLINK-12227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: shuai.xu
>Priority: Major
>
> Introduce classes and interfaces required to implement a 
> {{SchedulingStrategy}}:
> * {{SchedulingStrategy}}
> * {{SchedulingStrategyFactory}}
> * {{SchedulerOperations}}
> * {{ExecutionVertexDeploymentOption}}
> * {{DeploymentOption}}
> Interface design document: 
> https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU/edit#heading=h.dyw2hafals68



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed

2019-03-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11861:


 Summary: JobMasterTriggerSavepointIT case is not executed
 Key: FLINK-11861
 URL: https://issues.apache.org/jira/browse/FLINK-11861
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.2
Reporter: shuai.xu


The 
[JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java]
 will not be executed as the case name does not follow the style ***ITCase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-18 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-11375:
-
Description: 
In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called through 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
fields:(f0)) -> correlate: 
table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
select: 
item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
 (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED.
 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet 
exception, need to fail global execution graph
 java.lang.reflect.UndeclaredThrowableException
 at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
 at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
 at 
org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
 at 
org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:834)
 Caused by: java.util.concurrent.ExecutionException: 
java.util.ConcurrentModificationException
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
 ... 23 more
 Caused by: java.util.ConcurrentModificationException
 at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
 at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
 at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
 at 

[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-16 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-11375:
-
Description: 
In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called throw 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
fields:(f0)) -> correlate: 
table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
select: 
item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
 (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED.
2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet 
exception, need to fail global execution graph
java.lang.reflect.UndeclaredThrowableException
 at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
 at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
 at 
org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
 at 
org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: 
java.util.ConcurrentModificationException
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
 ... 23 more
Caused by: java.util.ConcurrentModificationException
 at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
 at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
 at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
 at 

[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-16 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11375:


 Summary: Concurrent modification to slot pool due to 
SlotSharingManager releaseSlot directly 
 Key: FLINK-11375
 URL: https://issues.apache.org/jira/browse/FLINK-11375
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.7.1
Reporter: shuai.xu


In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called throw 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-10 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739249#comment-16739249
 ] 

shuai.xu commented on FLINK-11298:
--

Hi [~till.rohrmann], thanks for you attention, Yes, this issue is depended on 
FLINK-10429. I will first add a design to it and then help to complete 
FLINK-10429 with stefan.

> Scheduling job in the unit of concurrent groups
> ---
>
> Key: FLINK-11298
> URL: https://issues.apache.org/jira/browse/FLINK-11298
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Now flink only support two scheduling modes, that's scheduling all tasks 
> Eager for streaming jobs and scheduling all task Lazy_from_source for batch 
> jobs. This is not flexible enough for the various requirements of different 
> job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based 
> scheduling strategy which first split a job into serval concurrent groups and 
> then schedule it in the unit of concurrent groups. This strategy will support 
> not only the existing EAGER and LAZY_FROM_SOURCE mode but also other 
> situation such as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10433) Stepwise creation of the ExecutionGraph sub-structures

2019-01-09 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739159#comment-16739159
 ] 

shuai.xu commented on FLINK-10433:
--

Like this proposal. Autoscaling is very useful. Further more, we could support 
automatically deciding the result partition type, such as in FLINK-11299. And 
in the more future, we could support dynamic job graph.

> Stepwise creation of the ExecutionGraph sub-structures
> --
>
> Key: FLINK-10433
> URL: https://issues.apache.org/jira/browse/FLINK-10433
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> In this step, we break the construction of ExecutionGraph substructures into 
> multiple steps. When translating a JobGraph to an ExecutionGraph, we could 
> only create a “skeleton” ExecutionGraph that is only built up to the 
> ExecutionJobVertex level. This enables scheduling in two steps. First we can 
> compute the minimal and desired amount of resources and ask the SlotPool for 
> them. Only when the available resources match the minimal requirements, the 
> actual scheduling would start. With this information, we can build the 
> ExecutionVertex and Execution objects already in the right parallelism and 
> avoid running out of resources during scheduling. This separation will help 
> us with the autoscaling use-case, where we figure out the actual parallelism 
> after we started with scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-09 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-11298:


Assignee: shuai.xu

> Scheduling job in the unit of concurrent groups
> ---
>
> Key: FLINK-11298
> URL: https://issues.apache.org/jira/browse/FLINK-11298
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Now flink only support two scheduling modes, that's scheduling all tasks 
> Eager for streaming jobs and scheduling all task Lazy_from_source for batch 
> jobs. This is not flexible enough for the various requirements of different 
> job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based 
> scheduling strategy which first split a job into serval concurrent groups and 
> then schedule it in the unit of concurrent groups. This strategy will support 
> not only the existing EAGER and LAZY_FROM_SOURCE mode but also other 
> situation such as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11299) Decide the result partition type dynamically

2019-01-09 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-11299:


Assignee: shuai.xu

> Decide the result partition type dynamically 
> -
>
> Key: FLINK-11299
> URL: https://issues.apache.org/jira/browse/FLINK-11299
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Now, the result partition type is decided when compiling JobGraph at the 
> client. This's to say, whether to output the result of a task to its 
> consumers by PIPELINED or BLOCKING is decided at client and can not be 
> changed any more. However, it is usually seen in batch jobs that a task is 
> configured PIPELINED but its consumers can not be started due to lack of 
> resource, this will lead to failover now. So we proposal to support deciding 
> the result partition type dynamically during runtime according to the 
> resource of cluster and so on to increase the success rate of batch job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11299) Decide the result partition type dynamically

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11299:


 Summary: Decide the result partition type dynamically 
 Key: FLINK-11299
 URL: https://issues.apache.org/jira/browse/FLINK-11299
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now, the result partition type is decided when compiling JobGraph at the 
client. This's to say, whether to output the result of a task to its consumers 
by PIPELINED or BLOCKING is decided at client and can not be changed any more. 
However, it is usually seen in batch jobs that a task is configured PIPELINED 
but its consumers can not be started due to lack of resource, this will lead to 
failover now. So we proposal to support deciding the result partition type 
dynamically during runtime according to the resource of cluster and so on to 
increase the success rate of batch job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10432) Introduce bulk/group-aware scheduling

2019-01-09 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739120#comment-16739120
 ] 

shuai.xu commented on FLINK-10432:
--

Hi [~srichter], it's really a good idea. I think we could benefit more from it. 
With this work we could support Concurrent Group based scheduling strategy 
easily to satisfy the flexible requirements for different jobs, such as the 
FLINK-10240. The idea of Concurrent Group based scheduling is in FLINK-11298.

> Introduce bulk/group-aware scheduling
> -
>
> Key: FLINK-10432
> URL: https://issues.apache.org/jira/browse/FLINK-10432
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> In this step, we change our new Scheduler to support reasoning over a bulk of 
> Executions before interacting with the SlotPool instead of requesting slots 
> on a per-execution basis. This gives us the opportunity to group and schedule 
> tasks together as one unit and to apply scheduling algorithms that require a 
> more holistic view on the tasks at hand. For now, this step will probably 
> mainly address streaming / EAGER scheduling. After this step, we expect that 
> we can support the use-case of local recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11298:


 Summary: Scheduling job in the unit of concurrent groups
 Key: FLINK-11298
 URL: https://issues.apache.org/jira/browse/FLINK-11298
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now flink only support two scheduling modes, that's scheduling all tasks Eager 
for streaming jobs and scheduling all task Lazy_from_source for batch jobs. 
This is not flexible enough for the various requirements of different job such 
as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based scheduling 
strategy which first split a job into serval concurrent groups and then 
schedule it in the unit of concurrent groups. This strategy will support not 
only the existing EAGER and LAZY_FROM_SOURCE mode but also other situation such 
as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout

2018-12-03 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11059:


 Summary: JobMaster may continue using an invalid slot if releasing 
idle slot meet a timeout
 Key: FLINK-11059
 URL: https://issues.apache.org/jira/browse/FLINK-11059
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.7.0
Reporter: shuai.xu
Assignee: shuai.xu


When job master releases an idle slot to task executor, it may meet a timeout 
exception which cause that task executor may have already released the slot, 
but job master will add the slot back to available slots, and the slot may be 
used again. Then job master will continue deploying task to the slot, but task 
executor does not recognize it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629735#comment-16629735
 ] 

shuai.xu commented on FLINK-10429:
--

+1

This proposal goes in the same direction as our approach. We have already 
initiated a discussion about how to make the schedule strategy plugable in 
[https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit.]
 I think we can make the scheduler more powerful together.

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak

2018-07-24 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9932:
---

 Summary: If task executor offer slot to job master timeout the 
first time, the slot will leak
 Key: FLINK-9932
 URL: https://issues.apache.org/jira/browse/FLINK-9932
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


When task executor offer slot to job master, it will first mark the slot as 
active.

If the offer slot call timeout, the task executor will try to call 
offerSlotsToJobManager again,

but it will only offer the slot in ALLOCATED state. As the slot has already be 
mark ACTIVE, it will never be offered and this will cause slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9884) Slot request may not be removed when it has already be assigned in slot manager

2018-07-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9884:
---

 Summary: Slot request may not be removed when it has already be 
assigned in slot manager
 Key: FLINK-9884
 URL: https://issues.apache.org/jira/browse/FLINK-9884
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


When task executor report a slotA with allocationId1, it may happen that slot 
manager record slotA is assigned to allocationId2, and the slot request with 
allocationId1 is not assigned. Then slot manager will update itself with slotA 
assigned to allocationId1, by it does not clear the slot request with 
allocationId1.

For example:
 # There is one free slot in slot manager.
 # Now come two slot request with allocationId1 and allocationId2.
 # The slot is assigned to allocationId1, but the requestSlot call timeout.
 # SlotManager assign the slot to allocationId2 and insert a slot request with 
allocationId1.
 # The second requestSlot call to task executor return SlotOccupiedException.
 # SlotManager update the slot to allocationID1, but the slot request is left.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9828) Resource manager should recover slot resource status after failover

2018-07-11 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-9828:
---

Assignee: shuai.xu

> Resource manager should recover slot resource status after failover
> ---
>
> Key: FLINK-9828
> URL: https://issues.apache.org/jira/browse/FLINK-9828
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> After resource manager failover, task executors will report their slot 
> allocation status to RM. But the report does not contain resource. So RM only 
> know the slot are occupied but can not know how much resource is used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9828) Resource manager should recover slot resource status after failover

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9828:
---

 Summary: Resource manager should recover slot resource status 
after failover
 Key: FLINK-9828
 URL: https://issues.apache.org/jira/browse/FLINK-9828
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu


After resource manager failover, task executors will report their slot 
allocation status to RM. But the report does not contain resource. So RM only 
know the slot are occupied but can not know how much resource is used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9827) ResourceManager may receive outdate report of slots status from task manager

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9827:
---

 Summary: ResourceManager may receive outdate report of slots 
status from task manager
 Key: FLINK-9827
 URL: https://issues.apache.org/jira/browse/FLINK-9827
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


TaskExecutor will report its slot status to resource manager in heartbeat, but 
this is in a different thread with the main rpc  thread. So it may happen that 
rm request a slot from task executor but then receive a heartbeat saying the 
slot not assigned. This will cause the slot be freed and assigned again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9826) Implement FLIP-6 YARN Resource Manager for SESSION mode

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9826:
---

 Summary: Implement FLIP-6 YARN Resource Manager for SESSION mode
 Key: FLINK-9826
 URL: https://issues.apache.org/jira/browse/FLINK-9826
 Project: Flink
  Issue Type: New Feature
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


The Flink YARN Session Resource Manager communicates with YARN's Resource 
Manager to acquire and release containers. It will ask for N containers from 
YARN according to the config。

It is also responsible to notify the JobManager eagerly about container 
failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception

2018-06-21 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-9632.
---
Resolution: Invalid

> SlotPool should notify the caller when allocateSlot meet an exception
> -
>
> Key: FLINK-9632
> URL: https://issues.apache.org/jira/browse/FLINK-9632
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> In SlotPool, the allocateSlot() will return a CompletableFuture, 
> but this future will only be completed when slotAndLocalityFuture return a 
> LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
> be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception

2018-06-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9632:
---

 Summary: SlotPool should notify the call when allocateSlot meet an 
exception
 Key: FLINK-9632
 URL: https://issues.apache.org/jira/browse/FLINK-9632
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In SlotPool, the allocateSlot() will return a CompletableFuture, 
but this future will only be completed when slotAndLocalityFuture return a 
LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception

2018-06-20 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-9632:

Summary: SlotPool should notify the caller when allocateSlot meet an 
exception  (was: SlotPool should notify the call when allocateSlot meet an 
exception)

> SlotPool should notify the caller when allocateSlot meet an exception
> -
>
> Key: FLINK-9632
> URL: https://issues.apache.org/jira/browse/FLINK-9632
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> In SlotPool, the allocateSlot() will return a CompletableFuture, 
> but this future will only be completed when slotAndLocalityFuture return a 
> LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
> be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9293:
---

 Summary: SlotPool should check slot id when accepting a slot offer 
with existing allocation id
 Key: FLINK-9293
 URL: https://issues.apache.org/jira/browse/FLINK-9293
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


For flip-6, there may be two or more slot assigned to the same slot allocation. 
For example, taskExecutor1 register, and assign allocationID1 to its slot1, but 
from taskExecutor1 side, the registeration timeout, and it register again, RM 
will fail the allocationID1 and assign slot2 on taskExecutor2 to it. but 
taskExecutor1 has already accept the allocationID1. 

So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
allocationID1. Now slot pool just accept all the slot offer, and this may one 
slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8938) Not remove job graph during job master failover

2018-03-13 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8938:
---

 Summary: Not  remove job graph during job master failover
 Key: FLINK-8938
 URL: https://issues.apache.org/jira/browse/FLINK-8938
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


When job master failover, the new dispatcher should not delete the job graph if 
failed to start the job manager runner, or else the other dispatchers can not 
recover it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion

2018-01-17 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8442:
---

 Summary: Should recovery the input split when an execution 
failover with FailoverRegion
 Key: FLINK-8442
 URL: https://issues.apache.org/jira/browse/FLINK-8442
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Scheduler
Affects Versions: 1.4.0
Reporter: shuai.xu


In flip-1, it enable only restart the executions in a FailoverRegion when a 
task fail. But now the input splits are assigned only when an 
ExecutionJobVertex is initializing, so when an executions restarts, the input 
splits it has read may can not be get from job master any more. Need to recover 
the input splits so they can be be consumed again when the task restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-4504) Support user to decide whether the result of an operator is presistent

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-4504.
---
Resolution: Won't Fix

> Support user to decide whether the result of an operator is presistent
> --
>
> Key: FLINK-4504
> URL: https://issues.apache.org/jira/browse/FLINK-4504
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Support an api to user for deciding whether they need the result of an 
> operator to be pipeline, spilled to local file or persisted to distribute 
> file system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-.
---
Resolution: Won't Fix

> Add a DFSInputChannel and DFSSubPartition
> -
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Batch Connectors and Input/Output Formats
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Add a new ResultPartitionType and ResultPartitionLocation type for DFS
> Add DFSSubpartition and DFSInputChannel for writing and reading DFS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-8289.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8434:

Description: The app master which container the job master and yarn 
resource manager may failover during running on yarn. The new resource manager 
should take over the running task managers after started. But now the 
YarnResourceManager does not record the running container to workerNodeMap, so 
when task managers register to it, it will reject them.  (was: The app master 
which container the job master and yarn resource manager may failover during 
running on yarn. The new resource manager should take over the running task 
managers after started. But now the YarnResourceManager does not record the 
running container to 

workerNodeMap, so when task managers register to it, it will reject them.)

> The new yarn resource manager should take over the running task managers 
> after failover
> ---
>
> Key: FLINK-8434
> URL: https://issues.apache.org/jira/browse/FLINK-8434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> The app master which container the job master and yarn resource manager may 
> failover during running on yarn. The new resource manager should take over 
> the running task managers after started. But now the YarnResourceManager does 
> not record the running container to workerNodeMap, so when task managers 
> register to it, it will reject them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover

2018-01-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8434:
---

 Summary: The new yarn resource manager should take over the 
running task managers after failover
 Key: FLINK-8434
 URL: https://issues.apache.org/jira/browse/FLINK-8434
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


The app master which container the job master and yarn resource manager may 
failover during running on yarn. The new resource manager should take over the 
running task managers after started. But now the YarnResourceManager does not 
record the running container to 

workerNodeMap, so when task managers register to it, it will reject them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8399:
---

 Summary: Use independent configurations for the different timeouts 
in slot manager
 Key: FLINK-8399
 URL: https://issues.apache.org/jira/browse/FLINK-8399
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


There are three parameter in slot manager to indicate the timeout for slot 
request to task manager, slot request to be discarded and task manager to be 
released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need 
to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8399:

Description: There are three parameter in slot manager to indicate the 
timeout for slot request to task manager, slot request to be discarded and task 
manager to be released. But now they all come from the value of 
AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them.  
(was: There are three parameter in slot manager to indicate the timeout for 
slot request to task manager, slot request to be discarded and task manager to 
be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, 
need to use independent configurations for them.)

> Use independent configurations for the different timeouts in slot manager
> -
>
> Key: FLINK-8399
> URL: https://issues.apache.org/jira/browse/FLINK-8399
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> There are three parameter in slot manager to indicate the timeout for slot 
> request to task manager, slot request to be discarded and task manager to be 
> released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, 
> need to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-24 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303117#comment-16303117
 ] 

shuai.xu commented on FLINK-8289:
-

The RestServerEndpoint should never know whether there is a proxy, so I think 
the getRestAddress should always return the server address of it. 
There are two ways: 1. let the endpoint always return its true server address 
not matter what address it is bind to. 2. bind it to the ip of the machine. 

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-19 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297920#comment-16297920
 ] 

shuai.xu commented on FLINK-8289:
-

Hi [~eronwright], yes, what we need is the advertised address. But for Yarn 
Proxy / Mesos Admin Router, the advertised address is not the proxy address, 
but the ip:port, and this address is register to the proxy address, so the 
router can redirect the proxy address to the real address of the rest server.

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-7878.
---

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-6434.
---

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-7928.
---

> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 
>
> Key: FLINK-7928
> URL: https://issues.apache.org/jira/browse/FLINK-7928
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> ResourceProfile records all the resource requirements for a slot。It is 
> generated by JobMaster and then passed to ResourceManager with the slot 
> request. 
> A task in the slot needs three parts of resource: 
> 1. The resource for the operators, this is specified by the ResourceSpec user 
> defined 
> 2. The resource for the operators to communicating with their upstreams. For 
> example, the resource for buffer pools and so on.
> 3. The resource for the operators to communicating with their downstreams. 
> Same as above.
> So ResourceProfile should contain three parts of resource, the first part 
> from ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5793) Running slot may not be add to AllocatedMap in SlotPool

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5793.
---

> Running slot may not be add to AllocatedMap in SlotPool
> ---
>
> Key: FLINK-5793
> URL: https://issues.apache.org/jira/browse/FLINK-5793
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
> Fix For: 1.3.0
>
>
> In SlotPool, when a slot is returned by a finished task, it will try to find 
> a pending request mataching it. If found, will give the slot to the request, 
> butnot add the slot to AllocatedMap.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5171.
---

> Wrong use of Preconditions.checkState in TaskManagerRunner
> --
>
> Key: FLINK-5171
> URL: https://issues.apache.org/jira/browse/FLINK-5171
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Preconditions.checkState will check the first parameter is true, if not, it 
> will throw an exception. but in TaskManagerRunner, it will throw an exception 
> if rpc port is valid.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5190) ZooKeeperLeaderRetrievalService should not close the zk client when stop

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5190.
---

> ZooKeeperLeaderRetrievalService should not close the zk client when stop
> 
>
> Key: FLINK-5190
> URL: https://issues.apache.org/jira/browse/FLINK-5190
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> The zk client is created outside of ZooKeeperLeaderRetrievalService and 
> psssed to it, when ZooKeeperLeaderRetrievalService stop, it should not stop 
> the zk client as other may be using it outside.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified

2017-12-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5170.
---

> getAkkaConfig will use localhost if hostname is specified 
> --
>
> Key: FLINK-5170
> URL: https://issues.apache.org/jira/browse/FLINK-5170
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> in AkkaUtil.scala, 
> def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): 
> Config = {
> getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
> else None)
>   }
> when hostname is specified, it use None.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-18 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8288:

Affects Version/s: 1.5.0

> Register the web interface url to yarn for yarn job mode
> 
>
> Key: FLINK-8288
> URL: https://issues.apache.org/jira/browse/FLINK-8288
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For flip-6 job mode, the resource manager is created before the web monitor, 
> so the web interface url is not set to resource manager, and the resource 
> manager can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-18 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8289:

Affects Version/s: 1.5.0

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8289:
---

 Summary: The RestServerEndpoint should return the address with 
real ip when getRestAdddress
 Key: FLINK-8289
 URL: https://issues.apache.org/jira/browse/FLINK-8289
 Project: Flink
  Issue Type: Bug
Reporter: shuai.xu


Now when RestServerEndpoint.getRestAddress, it will return an address same with 
the value of config rest.address, the default it 127.0.0.1:9067, but this 
address can not be accessed from another machine. And the ip for Dispatcher and 
JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and 
the getRestAddress will return 0.0.0.0:9067, this address will be registered to 
YARN or Mesos, but this address can not be accessed from another machine also. 
So it need to return the real ip:port for user to access the web monitor 
anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8288:
---

 Summary: Register the web interface url to yarn for yarn job mode
 Key: FLINK-8288
 URL: https://issues.apache.org/jira/browse/FLINK-8288
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


For flip-6 job mode, the resource manager is created before the web monitor, so 
the web interface url is not set to resource manager, and the resource manager 
can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8266:

Labels: flip-6  (was: )

> Add network memory to ResourceProfile
> -
>
> Key: FLINK-8266
> URL: https://issues.apache.org/jira/browse/FLINK-8266
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> In task manager side, it should allocated the network buffer pool according 
> to the input channel and output sub partition number, but when allocating a 
> worker, the resource profile doesn't contain the information about these 
> memory. 
> So I suggest add a network memory filed to ResourceProfile and job master 
> should calculate it when scheduling a task and then resource manager can 
> allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8266:
---

 Summary: Add network memory to ResourceProfile
 Key: FLINK-8266
 URL: https://issues.apache.org/jira/browse/FLINK-8266
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In task manager side, it should allocated the network buffer pool according to 
the input channel and output sub partition number, but when allocating a 
worker, the resource profile doesn't contain the information about these 
memory. 
So I suggest add a network memory filed to ResourceProfile and job master 
should calculate it when scheduling a task and then resource manager can 
allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode

2017-12-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8224:
---

 Summary: Should shudownApplication when job terminated in job mode
 Key: FLINK-8224
 URL: https://issues.apache.org/jira/browse/FLINK-8224
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


For job mode, one job is an application. When job finished, it should tell the 
resource manager to shutdown the application, otherwise the resource manager 
can not set the application status. For example, if yarn resource manager don't 
set application as finished to yarn master, the yarn will consider the 
application as still running.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.

2017-11-08 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-7870.
---

> SlotPool should cancel the slot request to RM if not need any more.
> ---
>
> Key: FLINK-7870
> URL: https://issues.apache.org/jira/browse/FLINK-7870
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> 1. SlotPool will request slot to rm if its slots are not enough.
> 2. If a slot request is not fulfilled in a certain time, SlotPool will treat 
> the request as timeout and send a new slot request by triggering a failover 
> in JobMaster, the previous request is not needed any more, but rm does not 
> know it.
> 3. This may cause the rm request much more resource than the job really need.
> For example:
> 1. A job need 100 slots. RM request 100 container to YARN.
> 2. But YARN is busy now, it has no resource for the job.
> 3. The job failover as the resource request not fulfilled in time.
> 4. It ask 100 slots again, now RM request 200 container to YARN.
> 5. If failover server time, the containers request  will become more and more.
> 6. Now YARN has resource, it will find that the job may need thousands of 
> containers. This is a waste of resources.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Description: 
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg!

  was:
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!sp.jpg|thumbnail!


> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.
> !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Attachment: sp.jpg

> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Description: 
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!sp.jpg|thumbnail!

  was:
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.



> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.
> !sp.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7970:
---

 Summary: SlotPool support batch allocating slots
 Key: FLINK-7970
 URL: https://issues.apache.org/jira/browse/FLINK-7970
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7969:
---

Assignee: shuai.xu

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Labels: flip-6  (was: )

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7969:
---

 Summary: Resource manager support batch request slots
 Key: FLINK-7969
 URL: https://issues.apache.org/jira/browse/FLINK-7969
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Reporter: shuai.xu


Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!

  was:
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now 
> resource manager only support requesting slot one by one, it's better to make 
> it support batch allocating alots, so that it can make a global decision with 
> all the resource requests. For example: it can decide how many slots should 
> be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now 
> resource manager only support requesting slot one by one, it's better to make 
> it support batch allocating alots, so that it can make a global decision with 
> all the resource requests. For example: it can decide how many slots should 
> be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! 

  was:
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: (was: rm.png)

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.jpg|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.png|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: rm.jpg

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: rm.png

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.png
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.png|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!attachment-name.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.png
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7871) SlotPool should release its unused slot to RM

2017-10-27 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7871:
---

Assignee: shuai.xu

> SlotPool should release its unused slot to RM
> -
>
> Key: FLINK-7871
> URL: https://issues.apache.org/jira/browse/FLINK-7871
> Project: Flink
>  Issue Type: Bug
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> As described in design wiki 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, 
> _*The SlotPool releases slots that are unused to the ResourceManager. Slots 
> count as unused if they are not used when the job is fully running (fully 
> recovered).*_
> but now, the slot pool will keep the slots once offered to it until the job 
> finished.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-10-25 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7928:

Labels: flip-6  (was: )

> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 
>
> Key: FLINK-7928
> URL: https://issues.apache.org/jira/browse/FLINK-7928
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> ResourceProfile records all the resource requirements for a slot。It is 
> generated by JobMaster and then passed to ResourceManager with the slot 
> request. 
> A task in the slot needs three parts of resource: 
> 1. The resource for the operators, this is specified by the ResourceSpec user 
> defined 
> 2. The resource for the operators to communicating with their upstreams. For 
> example, the resource for buffer pools and so on.
> 3. The resource for the operators to communicating with their downstreams. 
> Same as above.
> So ResourceProfile should contain three parts of resource, the first part 
> from ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-10-25 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7928:
---

 Summary: Extend the filed in ResourceProfile for precisely 
calculating the resource of a task manager
 Key: FLINK-7928
 URL: https://issues.apache.org/jira/browse/FLINK-7928
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, ResourceManager
Reporter: shuai.xu
Assignee: shuai.xu


ResourceProfile records all the resource requirements for a slot。It is 
generated by JobMaster and then passed to ResourceManager with the slot 
request. 
A task in the slot needs three parts of resource: 
1. The resource for the operators, this is specified by the ResourceSpec user 
defined 
2. The resource for the operators to communicating with their upstreams. For 
example, the resource for buffer pools and so on.
3. The resource for the operators to communicating with their downstreams. Same 
as above.
So ResourceProfile should contain three parts of resource, the first part from 
ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7878:

Labels: flip-6  (was: )

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7878:
---

Assignee: shuai.xu

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7878:
---

 Summary: Extend the resource type user can define in ResourceSpec
 Key: FLINK-7878
 URL: https://issues.apache.org/jira/browse/FLINK-7878
 Project: Flink
  Issue Type: Bug
  Components: DataSet API, DataStream API
Reporter: shuai.xu


Now, flink only support user define how much CPU and MEM used in an operator, 
but now the resource in a cluster is various. For example, an application for 
image processing may need GPU, some others may need FPGA. 
Only CPU and MEM is not enough, and the resource type is becoming more and 
more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7871) SlotPool should release its unused slot to RM

2017-10-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7871:
---

 Summary: SlotPool should release its unused slot to RM
 Key: FLINK-7871
 URL: https://issues.apache.org/jira/browse/FLINK-7871
 Project: Flink
  Issue Type: Bug
Reporter: shuai.xu


As described in design wiki 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, 
_*The SlotPool releases slots that are unused to the ResourceManager. Slots 
count as unused if they are not used when the job is fully running (fully 
recovered).*_
but now, the slot pool will keep the slots once offered to it until the job 
finished.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.

2017-10-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7870:
---

 Summary: SlotPool should cancel the slot request to RM if not need 
any more.
 Key: FLINK-7870
 URL: https://issues.apache.org/jira/browse/FLINK-7870
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


1. SlotPool will request slot to rm if its slots are not enough.
2. If a slot request is not fulfilled in a certain time, SlotPool will treat 
the request as timeout and send a new slot request by triggering a failover in 
JobMaster, the previous request is not needed any more, but rm does not know it.
3. This may cause the rm request much more resource than the job really need.
For example:
1. A job need 100 slots. RM request 100 container to YARN.
2. But YARN is busy now, it has no resource for the job.
3. The job failover as the resource request not fulfilled in time.
4. It ask 100 slots again, now RM request 200 container to YARN.
5. If failover server time, the containers request  will become more and more.
6. Now YARN has resource, it will find that the job may need thousands of 
containers. This is a waste of resources.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5868) Implement a new RestartStrategy that works for the FailoverRegion.

2017-07-07 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5868.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

> Implement a new RestartStrategy that works for the FailoverRegion.
> --
>
> Key: FLINK-5868
> URL: https://issues.apache.org/jira/browse/FLINK-5868
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
> Fix For: 1.3.0
>
>
> Will first implement a restart strategy base on failure rate. 
> For simplicity, the new restart strategy only count the failover of 
> FailoverRegion, for global failure of the execution graph, it will cause 
> several FailoverRegion to failover, just count the total number.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-07-07 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5867.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

> The implementation of RestartPipelinedRegionStrategy
> 
>
> Key: FLINK-5867
> URL: https://issues.apache.org/jira/browse/FLINK-5867
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
> Fix For: 1.3.0
>
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5866) The implementation of FailoverRegion.

2017-07-07 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5866.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

> The implementation of FailoverRegion.
> -
>
> Key: FLINK-5866
> URL: https://issues.apache.org/jira/browse/FLINK-5866
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
> Fix For: 1.3.0
>
>
> In flip1, FailoverRegion will manage the failover of a minimal pipelined 
> conncected executions.
> One job graph may has several FailoverRegions and they will be calculated 
> statically after job graph is attached.
> FailoverRegion has serval states, CREATED, CANCELING, CANCELED, RUNNING, it 
> will transforms its state according to the state of the ExectionVertexes it 
> contains and drives the failover during state transformation.
> A FailoverRegion may have preceding and succeeding FailoverRegions. It will 
> wait for its preceding ones and notice its succeeding ones when finishing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5857) Recycle idle containers in time for yarn mode

2017-07-06 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-5857.
---
Resolution: Duplicate

> Recycle idle containers in time for yarn mode
> -
>
> Key: FLINK-5857
> URL: https://issues.apache.org/jira/browse/FLINK-5857
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> When we run flink batch job like map reduce, after a map is finished, the 
> container for it may be idle for a long time, we need to have a strategy to 
> recycle there container to reduce resource usage



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5857) Recycle idle containers in time for yarn mode

2017-07-06 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076347#comment-16076347
 ] 

shuai.xu commented on FLINK-5857:
-

[~till.rohrmann] sure

> Recycle idle containers in time for yarn mode
> -
>
> Key: FLINK-5857
> URL: https://issues.apache.org/jira/browse/FLINK-5857
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> When we run flink batch job like map reduce, after a map is finished, the 
> container for it may be idle for a long time, we need to have a strategy to 
> recycle there container to reduce resource usage



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-07-03 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073081#comment-16073081
 ] 

shuai.xu commented on FLINK-6434:
-

[~till.rohrmann] Sorry, I have not start working on it

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-08 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000435#comment-16000435
 ] 

shuai.xu commented on FLINK-6434:
-

Yes, add a request id can solve the problem.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-08 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000232#comment-16000232
 ] 

shuai.xu edited comment on FLINK-6434 at 5/8/17 7:11 AM:
-

[~till.rohrmann] This seems can not fix the bug completely, as between 
allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot 
with allocationID2 may fulfill the pending request, and the allocatedSlots 
record it will allocationID2, failAllocation(allcoationID1) can not release it, 
the slot is still leaked.


was (Author: tiemsn):
[~till.rohrmann] This seems can not fix the bug totally, as between 
allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot 
with allocationID2 may fulfill the pending request, and the allocatedSlots 
record it will allocationID2, failAllocation(allcoationID1) can not release it, 
the slot is still leaked.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-07 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000232#comment-16000232
 ] 

shuai.xu commented on FLINK-6434:
-

[~till.rohrmann] This seems can not fix the bug totally, as between 
allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot 
with allocationID2 may fulfill the pending request, and the allocatedSlots 
record it will allocationID2, failAllocation(allcoationID1) can not release it, 
the slot is still leaked.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-04 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-6434:

Description: If the call allocateSlot() from Execution to Slotpool timeout, 
the job will begin to failover, but the pending request are still in SlotPool, 
if then a new slot register to SlotPool, it may be fulfill the outdated pending 
request and be added to allocatedSlots, but it will never be used and will 
never be recycled.  (was: If the call allocateSlot() from Execution to Slotpool 
timeout, the job will begin to failover, but the pending request are still in 
SlotPool, if then a need slot register to SlotPool, it may be fulfill the 
outdated pending request and be added to allocatedSlots, but it will never be 
used and will never be recycled.)

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   >