[jira] [Created] (FLINK-16140) Translate "Event Processing (CEP)" page into Chinese
shuai.xu created FLINK-16140: Summary: Translate "Event Processing (CEP)" page into Chinese Key: FLINK-16140 URL: https://issues.apache.org/jira/browse/FLINK-16140 Project: Flink Issue Type: Task Components: chinese-translation, Documentation Affects Versions: 1.10.0 Reporter: shuai.xu Translate the internal page "[https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html]; into Chinese. The doc located in "flink/docs/dev/libs/cep.zh.md" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16011) Normalize the within usage in Pattern
shuai.xu created FLINK-16011: Summary: Normalize the within usage in Pattern Key: FLINK-16011 URL: https://issues.apache.org/jira/browse/FLINK-16011 Project: Flink Issue Type: Improvement Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu In CEP, we can use Pattern.within() to set a window in which the pattern should be matched. However, the usage of within is ambiguous and confusing to user. For example: # Pattern.begin("a").within(t1).followedBy("b").within(t2) will use the minimal of t1 and t2 as the window time for the whole pattern. # Pattern.begin("a").followedBy("b").within(t2) will use t2 as the window time. # But Pattern.begin("a").within(t1).followedBy("b") will have no window time # While Pattern.begin("a").notFollowedBy("not").within(t1).followedBy("b").within(t2) will use t2 as the window time. So I propose to normalize the usage of within() and make strict checking when compiling the pattern. For example, we can only allow within() at the end of the pattern and point it out if user set it somewhere else when compiling the pattern. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16010) Support notFollowedBy with interval as the last part of a Pattern
shuai.xu created FLINK-16010: Summary: Support notFollowedBy with interval as the last part of a Pattern Key: FLINK-16010 URL: https://issues.apache.org/jira/browse/FLINK-16010 Project: Flink Issue Type: New Feature Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu Now, Pattern.begin("a").notFollowedBy("b") is not allowed in CEP. But this a useful in many applications. Such as operators may want to find the users who created an order but didn't pay in 10 minutes. So I propose to support that notFollowedBy() with a interval can be the last part of a Pattern. For example, Pattern.begin("a").notFollowedBy("b").within(Time.minutes(10)) will be valid in the future. Discuss in dev mail list is [https://lists.apache.org/thread.html/rc505728048663d672ad379578ac67d3219f6076986c80a2362802ebb%40%3Cdev.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15980) The notFollowedBy in the end of GroupPattern may be ignored
shuai.xu created FLINK-15980: Summary: The notFollowedBy in the end of GroupPattern may be ignored Key: FLINK-15980 URL: https://issues.apache.org/jira/browse/FLINK-15980 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu If we write a Pattern like this: Pattern group = Pattern.begin('A').notFollowedBy("B"); Pattern pattern = Pattern.begin(group).followedBy("C"); Let notFollowedBy as the last part of a GroupPattern. This pattern can be compile normally, but the notFollowedBy("B") doesn't work in fact. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15964) Getting previous stage in notFollowedBy may throw exception
shuai.xu created FLINK-15964: Summary: Getting previous stage in notFollowedBy may throw exception Key: FLINK-15964 URL: https://issues.apache.org/jira/browse/FLINK-15964 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu In a notFollowedBy() condition, it may throw exception if trying to get value from previous stage for comparison. For example: Pattern pattern = Pattern.begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .notFollowedBy("not").where(new IterativeCondition() { private static final long serialVersionUID = -4702359359303151881L; @Override public boolean filter(Event value, Context ctx) throws Exception { return value.getName().equals(ctx.getEventsForPattern("start").iterator().next().getName()); } }) .followedBy("middle").where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return value.getName().equals("b"); } }); with inputs: Event a = new Event(40, "a", 1.0); Event b1 = new Event(41, "a", 2.0); Event b2 = new Event(43, "b", 3.0); It will throw org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function. at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:698) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:628) at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292) at org.apache.flink.cep.nfa.NFA.process(NFA.java:228) at org.apache.flink.cep.utils.NFATestHarness.consumeRecord(NFATestHarness.java:107) at org.apache.flink.cep.utils.NFATestHarness.feedRecord(NFATestHarness.java:84) at org.apache.flink.cep.utils.NFATestHarness.feedRecords(NFATestHarness.java:77) at org.apache.flink.cep.nfa.NFAITCase.testEndWithNotFollow(NFAITCase.java:2914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2884) at org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2879) at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:752) at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:688) ... 33 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15873) Matched result may not be output if existing earlier partial matches
[ https://issues.apache.org/jira/browse/FLINK-15873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029563#comment-17029563 ] shuai.xu commented on FLINK-15873: -- hi @[~dwysakowicz], I have submit a [pull request|[https://github.com/apache/flink/pull/11009]] to fix this issue, could you help to review it? > Matched result may not be output if existing earlier partial matches > > > Key: FLINK-15873 > URL: https://issues.apache.org/jira/browse/FLINK-15873 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.9.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > When running some cep jobs with skip strategy, I found that when we get a > matched result, but if there is an earlier partial matches, the result will > not be returned. > I think this is due to a bug in processMatchesAccordingToSkipStrategy() in > NFA class. It should return matched result without judging whether this is > partial matches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15873) Matched result may not be output if existing earlier partial matches
shuai.xu created FLINK-15873: Summary: Matched result may not be output if existing earlier partial matches Key: FLINK-15873 URL: https://issues.apache.org/jira/browse/FLINK-15873 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu When running some cep jobs with skip strategy, I found that when we get a matched result, but if there is an earlier partial matches, the result will not be returned. I think this is due to a bug in processMatchesAccordingToSkipStrategy() in NFA class. It should return matched result without judging whether this is partial matches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15627) Correct the wrong naming of compareMaps in NFATestUtilities
shuai.xu created FLINK-15627: Summary: Correct the wrong naming of compareMaps in NFATestUtilities Key: FLINK-15627 URL: https://issues.apache.org/jira/browse/FLINK-15627 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.1 Reporter: shuai.xu The compareMaps in NFATestUtilities compare two lists in fact, so rename it to compareLists may be better. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12038) YARNITCase stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-12038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888482#comment-16888482 ] shuai.xu commented on FLINK-12038: -- This failure can be easily re-produced in my local machine. I enabled the logs of YARN, and found the reason. You can find the log of unregisterAM in jobmanager.log. When the job is finished, it will try to unregisterAM to YARN. In fact, it is not necessary to call killApplication, as the whole YARN mini cluster will be closed in the tearDown of test case. The bellowing is part of logs of job master: 2019-07-16 18:20:34,376 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from SCHEDULED to DEPLOYING. 2019-07-16 18:20:34,376 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/2) (attempt #0) to container_1563272405568_0001_01_02 @ e011239174096.et15sqa (dataPort=42072) 2019-07-16 18:20:34,404 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from SCHEDULED to DEPLOYING. 2019-07-16 18:20:34,405 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (2/2) (attempt #0) to container_1563272405568_0001_01_03 @ e011239174096.et15sqa (dataPort=41793) 2019-07-16 18:20:34,405 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) (65db57ac7166e0a96a3c5318bb262fb0) switched from SCHEDULED to DEPLOYING. 2019-07-16 18:20:34,414 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Sink: Unnamed (1/2) (attempt #0) to container_1563272405568_0001_01_03 @ e011239174096.et15sqa (dataPort=41793) 2019-07-16 18:20:34,447 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) (22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from SCHEDULED to DEPLOYING. 2019-07-16 18:20:34,447 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Sink: Unnamed (2/2) (attempt #0) to container_1563272405568_0001_01_02 @ e011239174096.et15sqa (dataPort=42072) 2019-07-16 18:20:34,897 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from DEPLOYING to RUNNING. 2019-07-16 18:20:34,949 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from DEPLOYING to RUNNING. 2019-07-16 18:20:35,056 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) (65db57ac7166e0a96a3c5318bb262fb0) switched from DEPLOYING to RUNNING. 2019-07-16 18:20:35,067 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) (22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from DEPLOYING to RUNNING. 2019-07-16 18:20:35,450 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (2/2) (fc3d9d65a75eabaf00d7d9372d2b9884) switched from RUNNING to FINISHED. 2019-07-16 18:20:35,480 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/2) (e13567c7f2d7a389c74f4583a67e34e8) switched from RUNNING to FINISHED. 2019-07-16 18:20:35,494 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/2) (22c3e0c0fd37dd00e75fcf855e2a6ca4) switched from RUNNING to FINISHED. 2019-07-16 18:20:35,508 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (1/2) (65db57ac7166e0a96a3c5318bb262fb0) switched from RUNNING to FINISHED. 2019-07-16 18:20:35,513 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (2f9313ea4fd33bef68111ed380a2ae1b) switched from state RUNNING to FINISHED. 2019-07-16 18:20:35,513 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 2f9313ea4fd33bef68111ed380a2ae1b. 2019-07-16 18:20:35,513 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 2019-07-16 18:20:35,564 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher - Job 2f9313ea4fd33bef68111ed380a2ae1b reached globally terminal state FINISHED. 2019-07-16 18:20:35,573 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Streaming Job(2f9313ea4fd33bef68111ed380a2ae1b). 2019-07-16 18:20:35,664 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool. 2019-07-16 18:20:35,666 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 165d22977dc31b3b410489789fdc1050: JobManager is shutting down.. 2019-07-16 18:20:35,668 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool. 2019-07-16 18:20:35,668 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job
[jira] [Commented] (FLINK-12038) YARNITCase stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-12038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16886925#comment-16886925 ] shuai.xu commented on FLINK-12038: -- [~till.rohrmann], I think this is caused by https://issues.apache.org/jira/browse/YARN-2853 of yarn 2.4. In the end of the case, it try to kill the application while YarnResourceManager will unregisterAM to yarn. I think we could fix it by adding a check for the application state before killing it, what do you think? > YARNITCase stalls on travis > --- > > Key: FLINK-12038 > URL: https://issues.apache.org/jira/browse/FLINK-12038 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Tests >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: shuai.xu >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > https://travis-ci.org/apache/flink/jobs/511932978 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278 ] shuai.xu edited comment on FLINK-12863 at 6/17/19 3:27 AM: --- Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport may be have some defects. Considering how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. was (Author: tiemsn): Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport can solve it. But how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278 ] shuai.xu commented on FLINK-12863: -- Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition between RM and TM, and adding a version in each slot to solve it. I think adding fencing token to AllocatedSlotReport can solve it. But how would you update the fencing token? When offering slots succeeds or before offering slots? If when offering slots succeeds, it may happen that JM use the new fencing token while TM considering the offering slots fail, so TM may not update the token, and JM have no change to use the old token any more. If TM updates the token before offering slots, it may happen that JM doesn't receive the offering, so JM doesn't update the token. I think using a version may be more suitable, as we can compare two version, the bigger version will be correct always. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12760) Implement ExecutionGraph to InputsLocationsRetriever Adapter
shuai.xu created FLINK-12760: Summary: Implement ExecutionGraph to InputsLocationsRetriever Adapter Key: FLINK-12760 URL: https://issues.apache.org/jira/browse/FLINK-12760 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: shuai.xu Assignee: shuai.xu Fix For: 1.9.0 Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which adapts the ExecutionGraph to the InputsLocationsRetriever interface. *Acceptance criteria* * The adapter always reflects an up to date view of the ExecutionGraph state -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12372) Implement ExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840166#comment-16840166 ] shuai.xu edited comment on FLINK-12372 at 5/16/19 2:56 AM: --- Hi [~gjy] For the ScheduleUnit, I think your proposal works. For the getPreferredLocationsBasedOnInputs, I think we can put it in the implementation of ExecutionSlotAllocator. But this is an expediency, it need to be refined later. And do we need to change the cancel(SlotExecutionVertexAssignment) in ExecutionSlotAllcator to cancel(ExecutionVertexID), as Scheduler may need to call this method when reset the ExeuctionVertex, but it doesn't know the SlotExecutionVertexAssignment, it only know ExeuctionVertexID. was (Author: tiemsn): Hi [~gjy] For the ScheduleUnit, I think your proposal works. For the getPreferredLocationsBasedOnInputs, I think we can put it in the implementation of ExecutionSlotAllocator. But this is an expediency, it need to be refined later. > Implement ExecutionSlotAllocator > > > Key: FLINK-12372 > URL: https://issues.apache.org/jira/browse/FLINK-12372 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: shuai.xu >Priority: Major > > Add and implement {{ExecutionSlotAllocator}} interface > Design document: > https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU > *Acceptance criteria* > * {{ExecutionSlotAllocator}} interface is defined and implemented > * interface implementation is unit tested -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12372) Implement ExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840166#comment-16840166 ] shuai.xu commented on FLINK-12372: -- Hi [~gjy] For the ScheduleUnit, I think your proposal works. For the getPreferredLocationsBasedOnInputs, I think we can put it in the implementation of ExecutionSlotAllocator. But this is an expediency, it need to be refined later. > Implement ExecutionSlotAllocator > > > Key: FLINK-12372 > URL: https://issues.apache.org/jira/browse/FLINK-12372 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: shuai.xu >Priority: Major > > Add and implement {{ExecutionSlotAllocator}} interface > Design document: > https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU > *Acceptance criteria* > * {{ExecutionSlotAllocator}} interface is defined and implemented > * interface implementation is unit tested -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12372) Implement ExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838492#comment-16838492 ] shuai.xu commented on FLINK-12372: -- I am now working on this issue. This is the implementation plan. 1. It will contain ExecutionSlotAllocator, DefaultExecutionSlotAllocator, ExecutionVertexSchedulingRequirements, SlotExecutionVertexAssignment and unit tests for DefaultExecutionSlotAllocator. 2. The DefaultExecutionSlotAllocator will use a SlotProvider to allocate slots and then return the SlotExecutionVertexAssignment to the caller. It will contain a Map to cache the unfulfilled slot requests. There is one problem: 1. The SlotProvider needs ScheduledUnit when allocating slots, but I think ExecutionSlotAllocator should get rid of Execution and ScheduleUnit, so It may need to create a new class extending the ScheduleUnit and use it in the ExecutionSlotAllocator. [~gjy], do you have any suggestion? > Implement ExecutionSlotAllocator > > > Key: FLINK-12372 > URL: https://issues.apache.org/jira/browse/FLINK-12372 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: shuai.xu >Priority: Major > > Add and implement {{ExecutionSlotAllocator}} interface > Design document: > https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU > *Acceptance criteria* > * {{ExecutionSlotAllocator}} interface is defined and implemented > * interface implementation is unit tested -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12228) Implement Eager Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-12228: Assignee: shuai.xu (was: Gary Yao) > Implement Eager Scheduling Strategy > --- > > Key: FLINK-12228 > URL: https://issues.apache.org/jira/browse/FLINK-12228 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: shuai.xu >Priority: Major > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.EAGER}}, i.e., all vertices are scheduled at once. > Acceptance Criteria: > * Test implementation of {{SchedulerOperations}} exists > * Test implementation of {{AccessExecutionGraph}} exists > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12227) Introduce SchedulingStrategy interface
[ https://issues.apache.org/jira/browse/FLINK-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-12227: Assignee: shuai.xu (was: Gary Yao) > Introduce SchedulingStrategy interface > -- > > Key: FLINK-12227 > URL: https://issues.apache.org/jira/browse/FLINK-12227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: shuai.xu >Priority: Major > > Introduce classes and interfaces required to implement a > {{SchedulingStrategy}}: > * {{SchedulingStrategy}} > * {{SchedulingStrategyFactory}} > * {{SchedulerOperations}} > * {{ExecutionVertexDeploymentOption}} > * {{DeploymentOption}} > Interface design document: > https://docs.google.com/document/d/1fstkML72YBO1tGD_dmG2rwvd9bklhRVauh4FSsDDwXU/edit#heading=h.dyw2hafals68 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed
shuai.xu created FLINK-11861: Summary: JobMasterTriggerSavepointIT case is not executed Key: FLINK-11861 URL: https://issues.apache.org/jira/browse/FLINK-11861 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.7.2 Reporter: shuai.xu The [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java] will not be executed as the case name does not follow the style ***ITCase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-11375: - Description: In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called through SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], fields:(f0)) -> correlate: table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), select: item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED. 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet exception, need to fail global execution graph java.lang.reflect.UndeclaredThrowableException at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) at org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) at org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) at org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) at org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) ... 23 more Caused by: java.util.ConcurrentModificationException at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) at
[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-11375: - Description: In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], fields:(f0)) -> correlate: table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), select: item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED. 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet exception, need to fail global execution graph java.lang.reflect.UndeclaredThrowableException at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) at org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) at org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) at org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) at org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) ... 23 more Caused by: java.util.ConcurrentModificationException at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) at
[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
shuai.xu created FLINK-11375: Summary: Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly Key: FLINK-11375 URL: https://issues.apache.org/jira/browse/FLINK-11375 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.7.1 Reporter: shuai.xu In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11298) Scheduling job in the unit of concurrent groups
[ https://issues.apache.org/jira/browse/FLINK-11298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739249#comment-16739249 ] shuai.xu commented on FLINK-11298: -- Hi [~till.rohrmann], thanks for you attention, Yes, this issue is depended on FLINK-10429. I will first add a design to it and then help to complete FLINK-10429 with stefan. > Scheduling job in the unit of concurrent groups > --- > > Key: FLINK-11298 > URL: https://issues.apache.org/jira/browse/FLINK-11298 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Now flink only support two scheduling modes, that's scheduling all tasks > Eager for streaming jobs and scheduling all task Lazy_from_source for batch > jobs. This is not flexible enough for the various requirements of different > job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based > scheduling strategy which first split a job into serval concurrent groups and > then schedule it in the unit of concurrent groups. This strategy will support > not only the existing EAGER and LAZY_FROM_SOURCE mode but also other > situation such as the Build/Probe in FLINK-10240. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10433) Stepwise creation of the ExecutionGraph sub-structures
[ https://issues.apache.org/jira/browse/FLINK-10433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739159#comment-16739159 ] shuai.xu commented on FLINK-10433: -- Like this proposal. Autoscaling is very useful. Further more, we could support automatically deciding the result partition type, such as in FLINK-11299. And in the more future, we could support dynamic job graph. > Stepwise creation of the ExecutionGraph sub-structures > -- > > Key: FLINK-10433 > URL: https://issues.apache.org/jira/browse/FLINK-10433 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > In this step, we break the construction of ExecutionGraph substructures into > multiple steps. When translating a JobGraph to an ExecutionGraph, we could > only create a “skeleton” ExecutionGraph that is only built up to the > ExecutionJobVertex level. This enables scheduling in two steps. First we can > compute the minimal and desired amount of resources and ask the SlotPool for > them. Only when the available resources match the minimal requirements, the > actual scheduling would start. With this information, we can build the > ExecutionVertex and Execution objects already in the right parallelism and > avoid running out of resources during scheduling. This separation will help > us with the autoscaling use-case, where we figure out the actual parallelism > after we started with scheduling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11298) Scheduling job in the unit of concurrent groups
[ https://issues.apache.org/jira/browse/FLINK-11298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-11298: Assignee: shuai.xu > Scheduling job in the unit of concurrent groups > --- > > Key: FLINK-11298 > URL: https://issues.apache.org/jira/browse/FLINK-11298 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Now flink only support two scheduling modes, that's scheduling all tasks > Eager for streaming jobs and scheduling all task Lazy_from_source for batch > jobs. This is not flexible enough for the various requirements of different > job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based > scheduling strategy which first split a job into serval concurrent groups and > then schedule it in the unit of concurrent groups. This strategy will support > not only the existing EAGER and LAZY_FROM_SOURCE mode but also other > situation such as the Build/Probe in FLINK-10240. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11299) Decide the result partition type dynamically
[ https://issues.apache.org/jira/browse/FLINK-11299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-11299: Assignee: shuai.xu > Decide the result partition type dynamically > - > > Key: FLINK-11299 > URL: https://issues.apache.org/jira/browse/FLINK-11299 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Now, the result partition type is decided when compiling JobGraph at the > client. This's to say, whether to output the result of a task to its > consumers by PIPELINED or BLOCKING is decided at client and can not be > changed any more. However, it is usually seen in batch jobs that a task is > configured PIPELINED but its consumers can not be started due to lack of > resource, this will lead to failover now. So we proposal to support deciding > the result partition type dynamically during runtime according to the > resource of cluster and so on to increase the success rate of batch job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11299) Decide the result partition type dynamically
shuai.xu created FLINK-11299: Summary: Decide the result partition type dynamically Key: FLINK-11299 URL: https://issues.apache.org/jira/browse/FLINK-11299 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Now, the result partition type is decided when compiling JobGraph at the client. This's to say, whether to output the result of a task to its consumers by PIPELINED or BLOCKING is decided at client and can not be changed any more. However, it is usually seen in batch jobs that a task is configured PIPELINED but its consumers can not be started due to lack of resource, this will lead to failover now. So we proposal to support deciding the result partition type dynamically during runtime according to the resource of cluster and so on to increase the success rate of batch job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10432) Introduce bulk/group-aware scheduling
[ https://issues.apache.org/jira/browse/FLINK-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739120#comment-16739120 ] shuai.xu commented on FLINK-10432: -- Hi [~srichter], it's really a good idea. I think we could benefit more from it. With this work we could support Concurrent Group based scheduling strategy easily to satisfy the flexible requirements for different jobs, such as the FLINK-10240. The idea of Concurrent Group based scheduling is in FLINK-11298. > Introduce bulk/group-aware scheduling > - > > Key: FLINK-10432 > URL: https://issues.apache.org/jira/browse/FLINK-10432 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > In this step, we change our new Scheduler to support reasoning over a bulk of > Executions before interacting with the SlotPool instead of requesting slots > on a per-execution basis. This gives us the opportunity to group and schedule > tasks together as one unit and to apply scheduling algorithms that require a > more holistic view on the tasks at hand. For now, this step will probably > mainly address streaming / EAGER scheduling. After this step, we expect that > we can support the use-case of local recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11298) Scheduling job in the unit of concurrent groups
shuai.xu created FLINK-11298: Summary: Scheduling job in the unit of concurrent groups Key: FLINK-11298 URL: https://issues.apache.org/jira/browse/FLINK-11298 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Now flink only support two scheduling modes, that's scheduling all tasks Eager for streaming jobs and scheduling all task Lazy_from_source for batch jobs. This is not flexible enough for the various requirements of different job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based scheduling strategy which first split a job into serval concurrent groups and then schedule it in the unit of concurrent groups. This strategy will support not only the existing EAGER and LAZY_FROM_SOURCE mode but also other situation such as the Build/Probe in FLINK-10240. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout
shuai.xu created FLINK-11059: Summary: JobMaster may continue using an invalid slot if releasing idle slot meet a timeout Key: FLINK-11059 URL: https://issues.apache.org/jira/browse/FLINK-11059 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.7.0 Reporter: shuai.xu Assignee: shuai.xu When job master releases an idle slot to task executor, it may meet a timeout exception which cause that task executor may have already released the slot, but job master will add the slot back to available slots, and the slot may be used again. Then job master will continue deploying task to the slot, but task executor does not recognize it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629735#comment-16629735 ] shuai.xu commented on FLINK-10429: -- +1 This proposal goes in the same direction as our approach. We have already initiated a discussion about how to make the schedule strategy plugable in [https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit.] I think we can make the scheduler more powerful together. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak
shuai.xu created FLINK-9932: --- Summary: If task executor offer slot to job master timeout the first time, the slot will leak Key: FLINK-9932 URL: https://issues.apache.org/jira/browse/FLINK-9932 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu When task executor offer slot to job master, it will first mark the slot as active. If the offer slot call timeout, the task executor will try to call offerSlotsToJobManager again, but it will only offer the slot in ALLOCATED state. As the slot has already be mark ACTIVE, it will never be offered and this will cause slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9884) Slot request may not be removed when it has already be assigned in slot manager
shuai.xu created FLINK-9884: --- Summary: Slot request may not be removed when it has already be assigned in slot manager Key: FLINK-9884 URL: https://issues.apache.org/jira/browse/FLINK-9884 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu When task executor report a slotA with allocationId1, it may happen that slot manager record slotA is assigned to allocationId2, and the slot request with allocationId1 is not assigned. Then slot manager will update itself with slotA assigned to allocationId1, by it does not clear the slot request with allocationId1. For example: # There is one free slot in slot manager. # Now come two slot request with allocationId1 and allocationId2. # The slot is assigned to allocationId1, but the requestSlot call timeout. # SlotManager assign the slot to allocationId2 and insert a slot request with allocationId1. # The second requestSlot call to task executor return SlotOccupiedException. # SlotManager update the slot to allocationID1, but the slot request is left. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9828) Resource manager should recover slot resource status after failover
[ https://issues.apache.org/jira/browse/FLINK-9828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-9828: --- Assignee: shuai.xu > Resource manager should recover slot resource status after failover > --- > > Key: FLINK-9828 > URL: https://issues.apache.org/jira/browse/FLINK-9828 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > After resource manager failover, task executors will report their slot > allocation status to RM. But the report does not contain resource. So RM only > know the slot are occupied but can not know how much resource is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9828) Resource manager should recover slot resource status after failover
shuai.xu created FLINK-9828: --- Summary: Resource manager should recover slot resource status after failover Key: FLINK-9828 URL: https://issues.apache.org/jira/browse/FLINK-9828 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu After resource manager failover, task executors will report their slot allocation status to RM. But the report does not contain resource. So RM only know the slot are occupied but can not know how much resource is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9827) ResourceManager may receive outdate report of slots status from task manager
shuai.xu created FLINK-9827: --- Summary: ResourceManager may receive outdate report of slots status from task manager Key: FLINK-9827 URL: https://issues.apache.org/jira/browse/FLINK-9827 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu TaskExecutor will report its slot status to resource manager in heartbeat, but this is in a different thread with the main rpc thread. So it may happen that rm request a slot from task executor but then receive a heartbeat saying the slot not assigned. This will cause the slot be freed and assigned again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9826) Implement FLIP-6 YARN Resource Manager for SESSION mode
shuai.xu created FLINK-9826: --- Summary: Implement FLIP-6 YARN Resource Manager for SESSION mode Key: FLINK-9826 URL: https://issues.apache.org/jira/browse/FLINK-9826 Project: Flink Issue Type: New Feature Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu The Flink YARN Session Resource Manager communicates with YARN's Resource Manager to acquire and release containers. It will ask for N containers from YARN according to the config。 It is also responsible to notify the JobManager eagerly about container failures. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception
[ https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-9632. --- Resolution: Invalid > SlotPool should notify the caller when allocateSlot meet an exception > - > > Key: FLINK-9632 > URL: https://issues.apache.org/jira/browse/FLINK-9632 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > In SlotPool, the allocateSlot() will return a CompletableFuture, > but this future will only be completed when slotAndLocalityFuture return a > LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never > be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception
shuai.xu created FLINK-9632: --- Summary: SlotPool should notify the call when allocateSlot meet an exception Key: FLINK-9632 URL: https://issues.apache.org/jira/browse/FLINK-9632 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu In SlotPool, the allocateSlot() will return a CompletableFuture, but this future will only be completed when slotAndLocalityFuture return a LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception
[ https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-9632: Summary: SlotPool should notify the caller when allocateSlot meet an exception (was: SlotPool should notify the call when allocateSlot meet an exception) > SlotPool should notify the caller when allocateSlot meet an exception > - > > Key: FLINK-9632 > URL: https://issues.apache.org/jira/browse/FLINK-9632 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > In SlotPool, the allocateSlot() will return a CompletableFuture, > but this future will only be completed when slotAndLocalityFuture return a > LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never > be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
shuai.xu created FLINK-9293: --- Summary: SlotPool should check slot id when accepting a slot offer with existing allocation id Key: FLINK-9293 URL: https://issues.apache.org/jira/browse/FLINK-9293 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu For flip-6, there may be two or more slot assigned to the same slot allocation. For example, taskExecutor1 register, and assign allocationID1 to its slot1, but from taskExecutor1 side, the registeration timeout, and it register again, RM will fail the allocationID1 and assign slot2 on taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the allocationID1. Now slot pool just accept all the slot offer, and this may one slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8938) Not remove job graph during job master failover
shuai.xu created FLINK-8938: --- Summary: Not remove job graph during job master failover Key: FLINK-8938 URL: https://issues.apache.org/jira/browse/FLINK-8938 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu When job master failover, the new dispatcher should not delete the job graph if failed to start the job manager runner, or else the other dispatchers can not recover it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion
shuai.xu created FLINK-8442: --- Summary: Should recovery the input split when an execution failover with FailoverRegion Key: FLINK-8442 URL: https://issues.apache.org/jira/browse/FLINK-8442 Project: Flink Issue Type: Bug Components: JobManager, Scheduler Affects Versions: 1.4.0 Reporter: shuai.xu In flip-1, it enable only restart the executions in a FailoverRegion when a task fail. But now the input splits are assigned only when an ExecutionJobVertex is initializing, so when an executions restarts, the input splits it has read may can not be get from job master any more. Need to recover the input splits so they can be be consumed again when the task restarts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4504) Support user to decide whether the result of an operator is presistent
[ https://issues.apache.org/jira/browse/FLINK-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-4504. --- Resolution: Won't Fix > Support user to decide whether the result of an operator is presistent > -- > > Key: FLINK-4504 > URL: https://issues.apache.org/jira/browse/FLINK-4504 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Support an api to user for deciding whether they need the result of an > operator to be pipeline, spilled to local file or persisted to distribute > file system. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-. --- Resolution: Won't Fix > Add a DFSInputChannel and DFSSubPartition > - > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Sub-task > Components: Batch Connectors and Input/Output Formats >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Add a new ResultPartitionType and ResultPartitionLocation type for DFS > Add DFSSubpartition and DFSInputChannel for writing and reading DFS -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-8289. --- Resolution: Fixed Fix Version/s: 1.5.0 > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover
[ https://issues.apache.org/jira/browse/FLINK-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8434: Description: The app master which container the job master and yarn resource manager may failover during running on yarn. The new resource manager should take over the running task managers after started. But now the YarnResourceManager does not record the running container to workerNodeMap, so when task managers register to it, it will reject them. (was: The app master which container the job master and yarn resource manager may failover during running on yarn. The new resource manager should take over the running task managers after started. But now the YarnResourceManager does not record the running container to workerNodeMap, so when task managers register to it, it will reject them.) > The new yarn resource manager should take over the running task managers > after failover > --- > > Key: FLINK-8434 > URL: https://issues.apache.org/jira/browse/FLINK-8434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > The app master which container the job master and yarn resource manager may > failover during running on yarn. The new resource manager should take over > the running task managers after started. But now the YarnResourceManager does > not record the running container to workerNodeMap, so when task managers > register to it, it will reject them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover
shuai.xu created FLINK-8434: --- Summary: The new yarn resource manager should take over the running task managers after failover Key: FLINK-8434 URL: https://issues.apache.org/jira/browse/FLINK-8434 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu The app master which container the job master and yarn resource manager may failover during running on yarn. The new resource manager should take over the running task managers after started. But now the YarnResourceManager does not record the running container to workerNodeMap, so when task managers register to it, it will reject them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
shuai.xu created FLINK-8399: --- Summary: Use independent configurations for the different timeouts in slot manager Key: FLINK-8399 URL: https://issues.apache.org/jira/browse/FLINK-8399 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
[ https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8399: Description: There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them. (was: There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them.) > Use independent configurations for the different timeouts in slot manager > - > > Key: FLINK-8399 > URL: https://issues.apache.org/jira/browse/FLINK-8399 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > There are three parameter in slot manager to indicate the timeout for slot > request to task manager, slot request to be discarded and task manager to be > released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, > need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303117#comment-16303117 ] shuai.xu commented on FLINK-8289: - The RestServerEndpoint should never know whether there is a proxy, so I think the getRestAddress should always return the server address of it. There are two ways: 1. let the endpoint always return its true server address not matter what address it is bind to. 2. bind it to the ip of the machine. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297920#comment-16297920 ] shuai.xu commented on FLINK-8289: - Hi [~eronwright], yes, what we need is the advertised address. But for Yarn Proxy / Mesos Admin Router, the advertised address is not the proxy address, but the ip:port, and this address is register to the proxy address, so the router can redirect the proxy address to the real address of the rest server. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-7878. --- > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-6434. --- > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-7928. --- > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager > > > Key: FLINK-7928 > URL: https://issues.apache.org/jira/browse/FLINK-7928 > Project: Flink > Issue Type: Improvement > Components: JobManager, ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > ResourceProfile records all the resource requirements for a slot。It is > generated by JobMaster and then passed to ResourceManager with the slot > request. > A task in the slot needs three parts of resource: > 1. The resource for the operators, this is specified by the ResourceSpec user > defined > 2. The resource for the operators to communicating with their upstreams. For > example, the resource for buffer pools and so on. > 3. The resource for the operators to communicating with their downstreams. > Same as above. > So ResourceProfile should contain three parts of resource, the first part > from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5793) Running slot may not be add to AllocatedMap in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-5793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5793. --- > Running slot may not be add to AllocatedMap in SlotPool > --- > > Key: FLINK-5793 > URL: https://issues.apache.org/jira/browse/FLINK-5793 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: shuai.xu >Assignee: shuai.xu > Fix For: 1.3.0 > > > In SlotPool, when a slot is returned by a finished task, it will try to find > a pending request mataching it. If found, will give the slot to the request, > butnot add the slot to AllocatedMap. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-5171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5171. --- > Wrong use of Preconditions.checkState in TaskManagerRunner > -- > > Key: FLINK-5171 > URL: https://issues.apache.org/jira/browse/FLINK-5171 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Preconditions.checkState will check the first parameter is true, if not, it > will throw an exception. but in TaskManagerRunner, it will throw an exception > if rpc port is valid. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5190) ZooKeeperLeaderRetrievalService should not close the zk client when stop
[ https://issues.apache.org/jira/browse/FLINK-5190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5190. --- > ZooKeeperLeaderRetrievalService should not close the zk client when stop > > > Key: FLINK-5190 > URL: https://issues.apache.org/jira/browse/FLINK-5190 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > The zk client is created outside of ZooKeeperLeaderRetrievalService and > psssed to it, when ZooKeeperLeaderRetrievalService stop, it should not stop > the zk client as other may be using it outside. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified
[ https://issues.apache.org/jira/browse/FLINK-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5170. --- > getAkkaConfig will use localhost if hostname is specified > -- > > Key: FLINK-5170 > URL: https://issues.apache.org/jira/browse/FLINK-5170 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > in AkkaUtil.scala, > def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): > Config = { > getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) > else None) > } > when hostname is specified, it use None. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8288) Register the web interface url to yarn for yarn job mode
[ https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8288: Affects Version/s: 1.5.0 > Register the web interface url to yarn for yarn job mode > > > Key: FLINK-8288 > URL: https://issues.apache.org/jira/browse/FLINK-8288 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > For flip-6 job mode, the resource manager is created before the web monitor, > so the web interface url is not set to resource manager, and the resource > manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8289: Affects Version/s: 1.5.0 > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
shuai.xu created FLINK-8289: --- Summary: The RestServerEndpoint should return the address with real ip when getRestAdddress Key: FLINK-8289 URL: https://issues.apache.org/jira/browse/FLINK-8289 Project: Flink Issue Type: Bug Reporter: shuai.xu Now when RestServerEndpoint.getRestAddress, it will return an address same with the value of config rest.address, the default it 127.0.0.1:9067, but this address can not be accessed from another machine. And the ip for Dispatcher and JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address will be registered to YARN or Mesos, but this address can not be accessed from another machine also. So it need to return the real ip:port for user to access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode
shuai.xu created FLINK-8288: --- Summary: Register the web interface url to yarn for yarn job mode Key: FLINK-8288 URL: https://issues.apache.org/jira/browse/FLINK-8288 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu For flip-6 job mode, the resource manager is created before the web monitor, so the web interface url is not set to resource manager, and the resource manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8266) Add network memory to ResourceProfile
[ https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8266: Labels: flip-6 (was: ) > Add network memory to ResourceProfile > - > > Key: FLINK-8266 > URL: https://issues.apache.org/jira/browse/FLINK-8266 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > In task manager side, it should allocated the network buffer pool according > to the input channel and output sub partition number, but when allocating a > worker, the resource profile doesn't contain the information about these > memory. > So I suggest add a network memory filed to ResourceProfile and job master > should calculate it when scheduling a task and then resource manager can > allocating a container with the resource profile. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8266) Add network memory to ResourceProfile
shuai.xu created FLINK-8266: --- Summary: Add network memory to ResourceProfile Key: FLINK-8266 URL: https://issues.apache.org/jira/browse/FLINK-8266 Project: Flink Issue Type: Improvement Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu In task manager side, it should allocated the network buffer pool according to the input channel and output sub partition number, but when allocating a worker, the resource profile doesn't contain the information about these memory. So I suggest add a network memory filed to ResourceProfile and job master should calculate it when scheduling a task and then resource manager can allocating a container with the resource profile. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode
shuai.xu created FLINK-8224: --- Summary: Should shudownApplication when job terminated in job mode Key: FLINK-8224 URL: https://issues.apache.org/jira/browse/FLINK-8224 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu For job mode, one job is an application. When job finished, it should tell the resource manager to shutdown the application, otherwise the resource manager can not set the application status. For example, if yarn resource manager don't set application as finished to yarn master, the yarn will consider the application as still running. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
[ https://issues.apache.org/jira/browse/FLINK-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-7870. --- > SlotPool should cancel the slot request to RM if not need any more. > --- > > Key: FLINK-7870 > URL: https://issues.apache.org/jira/browse/FLINK-7870 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > 1. SlotPool will request slot to rm if its slots are not enough. > 2. If a slot request is not fulfilled in a certain time, SlotPool will treat > the request as timeout and send a new slot request by triggering a failover > in JobMaster, the previous request is not needed any more, but rm does not > know it. > 3. This may cause the rm request much more resource than the job really need. > For example: > 1. A job need 100 slots. RM request 100 container to YARN. > 2. But YARN is busy now, it has no resource for the job. > 3. The job failover as the resource request not fulfilled in time. > 4. It ask 100 slots again, now RM request 200 container to YARN. > 5. If failover server time, the containers request will become more and more. > 6. Now YARN has resource, it will find that the job may need thousands of > containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Description: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg! was: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !sp.jpg|thumbnail! > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. > !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Attachment: sp.jpg > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Description: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !sp.jpg|thumbnail! was: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. > !sp.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots
shuai.xu created FLINK-7970: --- Summary: SlotPool support batch allocating slots Key: FLINK-7970 URL: https://issues.apache.org/jira/browse/FLINK-7970 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Assignee: shuai.xu Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-7969: --- Assignee: shuai.xu > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Labels: flip-6 (was: ) > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu > Labels: flip-6 > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7969) Resource manager support batch request slots
shuai.xu created FLINK-7969: --- Summary: Resource manager support batch request slots Key: FLINK-7969 URL: https://issues.apache.org/jira/browse/FLINK-7969 Project: Flink Issue Type: Improvement Components: ResourceManager Reporter: shuai.xu Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now > resource manager only support requesting slot one by one, it's better to make > it support batch allocating alots, so that it can make a global decision with > all the resource requests. For example: it can decide how many slots should > be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now > resource manager only support requesting slot one by one, it's better to make > it support batch allocating alots, so that it can make a global decision with > all the resource requests. For example: it can decide how many slots should > be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! was: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: (was: rm.png) > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.png|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: rm.jpg > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: rm.png > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.png > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.png|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !attachment-name.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.png > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7871) SlotPool should release its unused slot to RM
[ https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-7871: --- Assignee: shuai.xu > SlotPool should release its unused slot to RM > - > > Key: FLINK-7871 > URL: https://issues.apache.org/jira/browse/FLINK-7871 > Project: Flink > Issue Type: Bug >Reporter: shuai.xu >Assignee: shuai.xu > > As described in design wiki > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, > _*The SlotPool releases slots that are unused to the ResourceManager. Slots > count as unused if they are not used when the job is fully running (fully > recovered).*_ > but now, the slot pool will keep the slots once offered to it until the job > finished. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7928: Labels: flip-6 (was: ) > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager > > > Key: FLINK-7928 > URL: https://issues.apache.org/jira/browse/FLINK-7928 > Project: Flink > Issue Type: Improvement > Components: JobManager, ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > ResourceProfile records all the resource requirements for a slot。It is > generated by JobMaster and then passed to ResourceManager with the slot > request. > A task in the slot needs three parts of resource: > 1. The resource for the operators, this is specified by the ResourceSpec user > defined > 2. The resource for the operators to communicating with their upstreams. For > example, the resource for buffer pools and so on. > 3. The resource for the operators to communicating with their downstreams. > Same as above. > So ResourceProfile should contain three parts of resource, the first part > from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
shuai.xu created FLINK-7928: --- Summary: Extend the filed in ResourceProfile for precisely calculating the resource of a task manager Key: FLINK-7928 URL: https://issues.apache.org/jira/browse/FLINK-7928 Project: Flink Issue Type: Improvement Components: JobManager, ResourceManager Reporter: shuai.xu Assignee: shuai.xu ResourceProfile records all the resource requirements for a slot。It is generated by JobMaster and then passed to ResourceManager with the slot request. A task in the slot needs three parts of resource: 1. The resource for the operators, this is specified by the ResourceSpec user defined 2. The resource for the operators to communicating with their upstreams. For example, the resource for buffer pools and so on. 3. The resource for the operators to communicating with their downstreams. Same as above. So ResourceProfile should contain three parts of resource, the first part from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7878: Labels: flip-6 (was: ) > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-7878: --- Assignee: shuai.xu > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7878) Extend the resource type user can define in ResourceSpec
shuai.xu created FLINK-7878: --- Summary: Extend the resource type user can define in ResourceSpec Key: FLINK-7878 URL: https://issues.apache.org/jira/browse/FLINK-7878 Project: Flink Issue Type: Bug Components: DataSet API, DataStream API Reporter: shuai.xu Now, flink only support user define how much CPU and MEM used in an operator, but now the resource in a cluster is various. For example, an application for image processing may need GPU, some others may need FPGA. Only CPU and MEM is not enough, and the resource type is becoming more and more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7871) SlotPool should release its unused slot to RM
shuai.xu created FLINK-7871: --- Summary: SlotPool should release its unused slot to RM Key: FLINK-7871 URL: https://issues.apache.org/jira/browse/FLINK-7871 Project: Flink Issue Type: Bug Reporter: shuai.xu As described in design wiki https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, _*The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).*_ but now, the slot pool will keep the slots once offered to it until the job finished. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
shuai.xu created FLINK-7870: --- Summary: SlotPool should cancel the slot request to RM if not need any more. Key: FLINK-7870 URL: https://issues.apache.org/jira/browse/FLINK-7870 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu 1. SlotPool will request slot to rm if its slots are not enough. 2. If a slot request is not fulfilled in a certain time, SlotPool will treat the request as timeout and send a new slot request by triggering a failover in JobMaster, the previous request is not needed any more, but rm does not know it. 3. This may cause the rm request much more resource than the job really need. For example: 1. A job need 100 slots. RM request 100 container to YARN. 2. But YARN is busy now, it has no resource for the job. 3. The job failover as the resource request not fulfilled in time. 4. It ask 100 slots again, now RM request 200 container to YARN. 5. If failover server time, the containers request will become more and more. 6. Now YARN has resource, it will find that the job may need thousands of containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5868) Implement a new RestartStrategy that works for the FailoverRegion.
[ https://issues.apache.org/jira/browse/FLINK-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5868. --- Resolution: Fixed Fix Version/s: 1.3.0 > Implement a new RestartStrategy that works for the FailoverRegion. > -- > > Key: FLINK-5868 > URL: https://issues.apache.org/jira/browse/FLINK-5868 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Fix For: 1.3.0 > > > Will first implement a restart strategy base on failure rate. > For simplicity, the new restart strategy only count the failover of > FailoverRegion, for global failure of the execution graph, it will cause > several FailoverRegion to failover, just count the total number. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5867. --- Resolution: Fixed Fix Version/s: 1.3.0 > The implementation of RestartPipelinedRegionStrategy > > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Fix For: 1.3.0 > > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5866) The implementation of FailoverRegion.
[ https://issues.apache.org/jira/browse/FLINK-5866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5866. --- Resolution: Fixed Fix Version/s: 1.3.0 > The implementation of FailoverRegion. > - > > Key: FLINK-5866 > URL: https://issues.apache.org/jira/browse/FLINK-5866 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Fix For: 1.3.0 > > > In flip1, FailoverRegion will manage the failover of a minimal pipelined > conncected executions. > One job graph may has several FailoverRegions and they will be calculated > statically after job graph is attached. > FailoverRegion has serval states, CREATED, CANCELING, CANCELED, RUNNING, it > will transforms its state according to the state of the ExectionVertexes it > contains and drives the failover during state transformation. > A FailoverRegion may have preceding and succeeding FailoverRegions. It will > wait for its preceding ones and notice its succeeding ones when finishing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5857) Recycle idle containers in time for yarn mode
[ https://issues.apache.org/jira/browse/FLINK-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-5857. --- Resolution: Duplicate > Recycle idle containers in time for yarn mode > - > > Key: FLINK-5857 > URL: https://issues.apache.org/jira/browse/FLINK-5857 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > When we run flink batch job like map reduce, after a map is finished, the > container for it may be idle for a long time, we need to have a strategy to > recycle there container to reduce resource usage -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5857) Recycle idle containers in time for yarn mode
[ https://issues.apache.org/jira/browse/FLINK-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076347#comment-16076347 ] shuai.xu commented on FLINK-5857: - [~till.rohrmann] sure > Recycle idle containers in time for yarn mode > - > > Key: FLINK-5857 > URL: https://issues.apache.org/jira/browse/FLINK-5857 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > When we run flink batch job like map reduce, after a map is finished, the > container for it may be idle for a long time, we need to have a strategy to > recycle there container to reduce resource usage -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073081#comment-16073081 ] shuai.xu commented on FLINK-6434: - [~till.rohrmann] Sorry, I have not start working on it > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000435#comment-16000435 ] shuai.xu commented on FLINK-6434: - Yes, add a request id can solve the problem. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000232#comment-16000232 ] shuai.xu edited comment on FLINK-6434 at 5/8/17 7:11 AM: - [~till.rohrmann] This seems can not fix the bug completely, as between allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot with allocationID2 may fulfill the pending request, and the allocatedSlots record it will allocationID2, failAllocation(allcoationID1) can not release it, the slot is still leaked. was (Author: tiemsn): [~till.rohrmann] This seems can not fix the bug totally, as between allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot with allocationID2 may fulfill the pending request, and the allocatedSlots record it will allocationID2, failAllocation(allcoationID1) can not release it, the slot is still leaked. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000232#comment-16000232 ] shuai.xu commented on FLINK-6434: - [~till.rohrmann] This seems can not fix the bug totally, as between allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot with allocationID2 may fulfill the pending request, and the allocatedSlots record it will allocationID2, failAllocation(allcoationID1) can not release it, the slot is still leaked. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-6434: Description: If the call allocateSlot() from Execution to Slotpool timeout, the job will begin to failover, but the pending request are still in SlotPool, if then a new slot register to SlotPool, it may be fulfill the outdated pending request and be added to allocatedSlots, but it will never be used and will never be recycled. (was: If the call allocateSlot() from Execution to Slotpool timeout, the job will begin to failover, but the pending request are still in SlotPool, if then a need slot register to SlotPool, it may be fulfill the outdated pending request and be added to allocatedSlots, but it will never be used and will never be recycled.) > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.3.15#6346)