[jira] [Created] (FLINK-16140) Translate "Event Processing (CEP)" page into Chinese
shuai.xu created FLINK-16140: Summary: Translate "Event Processing (CEP)" page into Chinese Key: FLINK-16140 URL: https://issues.apache.org/jira/browse/FLINK-16140 Project: Flink Issue Type: Task Components: chinese-translation, Documentation Affects Versions: 1.10.0 Reporter: shuai.xu Translate the internal page "[https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html]; into Chinese. The doc located in "flink/docs/dev/libs/cep.zh.md" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16011) Normalize the within usage in Pattern
shuai.xu created FLINK-16011: Summary: Normalize the within usage in Pattern Key: FLINK-16011 URL: https://issues.apache.org/jira/browse/FLINK-16011 Project: Flink Issue Type: Improvement Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu In CEP, we can use Pattern.within() to set a window in which the pattern should be matched. However, the usage of within is ambiguous and confusing to user. For example: # Pattern.begin("a").within(t1).followedBy("b").within(t2) will use the minimal of t1 and t2 as the window time for the whole pattern. # Pattern.begin("a").followedBy("b").within(t2) will use t2 as the window time. # But Pattern.begin("a").within(t1).followedBy("b") will have no window time # While Pattern.begin("a").notFollowedBy("not").within(t1).followedBy("b").within(t2) will use t2 as the window time. So I propose to normalize the usage of within() and make strict checking when compiling the pattern. For example, we can only allow within() at the end of the pattern and point it out if user set it somewhere else when compiling the pattern. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16010) Support notFollowedBy with interval as the last part of a Pattern
shuai.xu created FLINK-16010: Summary: Support notFollowedBy with interval as the last part of a Pattern Key: FLINK-16010 URL: https://issues.apache.org/jira/browse/FLINK-16010 Project: Flink Issue Type: New Feature Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu Now, Pattern.begin("a").notFollowedBy("b") is not allowed in CEP. But this a useful in many applications. Such as operators may want to find the users who created an order but didn't pay in 10 minutes. So I propose to support that notFollowedBy() with a interval can be the last part of a Pattern. For example, Pattern.begin("a").notFollowedBy("b").within(Time.minutes(10)) will be valid in the future. Discuss in dev mail list is [https://lists.apache.org/thread.html/rc505728048663d672ad379578ac67d3219f6076986c80a2362802ebb%40%3Cdev.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15980) The notFollowedBy in the end of GroupPattern may be ignored
shuai.xu created FLINK-15980: Summary: The notFollowedBy in the end of GroupPattern may be ignored Key: FLINK-15980 URL: https://issues.apache.org/jira/browse/FLINK-15980 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu If we write a Pattern like this: Pattern group = Pattern.begin('A').notFollowedBy("B"); Pattern pattern = Pattern.begin(group).followedBy("C"); Let notFollowedBy as the last part of a GroupPattern. This pattern can be compile normally, but the notFollowedBy("B") doesn't work in fact. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15964) Getting previous stage in notFollowedBy may throw exception
shuai.xu created FLINK-15964: Summary: Getting previous stage in notFollowedBy may throw exception Key: FLINK-15964 URL: https://issues.apache.org/jira/browse/FLINK-15964 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu In a notFollowedBy() condition, it may throw exception if trying to get value from previous stage for comparison. For example: Pattern pattern = Pattern.begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .notFollowedBy("not").where(new IterativeCondition() { private static final long serialVersionUID = -4702359359303151881L; @Override public boolean filter(Event value, Context ctx) throws Exception { return value.getName().equals(ctx.getEventsForPattern("start").iterator().next().getName()); } }) .followedBy("middle").where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return value.getName().equals("b"); } }); with inputs: Event a = new Event(40, "a", 1.0); Event b1 = new Event(41, "a", 2.0); Event b2 = new Event(43, "b", 3.0); It will throw org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function. at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:698) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:628) at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292) at org.apache.flink.cep.nfa.NFA.process(NFA.java:228) at org.apache.flink.cep.utils.NFATestHarness.consumeRecord(NFATestHarness.java:107) at org.apache.flink.cep.utils.NFATestHarness.feedRecord(NFATestHarness.java:84) at org.apache.flink.cep.utils.NFATestHarness.feedRecords(NFATestHarness.java:77) at org.apache.flink.cep.nfa.NFAITCase.testEndWithNotFollow(NFAITCase.java:2914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2884) at org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2879) at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:752) at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:688) ... 33 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15873) Matched result may not be output if existing earlier partial matches
shuai.xu created FLINK-15873: Summary: Matched result may not be output if existing earlier partial matches Key: FLINK-15873 URL: https://issues.apache.org/jira/browse/FLINK-15873 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: shuai.xu When running some cep jobs with skip strategy, I found that when we get a matched result, but if there is an earlier partial matches, the result will not be returned. I think this is due to a bug in processMatchesAccordingToSkipStrategy() in NFA class. It should return matched result without judging whether this is partial matches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15627) Correct the wrong naming of compareMaps in NFATestUtilities
shuai.xu created FLINK-15627: Summary: Correct the wrong naming of compareMaps in NFATestUtilities Key: FLINK-15627 URL: https://issues.apache.org/jira/browse/FLINK-15627 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.1 Reporter: shuai.xu The compareMaps in NFATestUtilities compare two lists in fact, so rename it to compareLists may be better. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-12760) Implement ExecutionGraph to InputsLocationsRetriever Adapter
shuai.xu created FLINK-12760: Summary: Implement ExecutionGraph to InputsLocationsRetriever Adapter Key: FLINK-12760 URL: https://issues.apache.org/jira/browse/FLINK-12760 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: shuai.xu Assignee: shuai.xu Fix For: 1.9.0 Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which adapts the ExecutionGraph to the InputsLocationsRetriever interface. *Acceptance criteria* * The adapter always reflects an up to date view of the ExecutionGraph state -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed
shuai.xu created FLINK-11861: Summary: JobMasterTriggerSavepointIT case is not executed Key: FLINK-11861 URL: https://issues.apache.org/jira/browse/FLINK-11861 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.7.2 Reporter: shuai.xu The [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java] will not be executed as the case name does not follow the style ***ITCase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
shuai.xu created FLINK-11375: Summary: Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly Key: FLINK-11375 URL: https://issues.apache.org/jira/browse/FLINK-11375 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.7.1 Reporter: shuai.xu In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11299) Decide the result partition type dynamically
shuai.xu created FLINK-11299: Summary: Decide the result partition type dynamically Key: FLINK-11299 URL: https://issues.apache.org/jira/browse/FLINK-11299 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Now, the result partition type is decided when compiling JobGraph at the client. This's to say, whether to output the result of a task to its consumers by PIPELINED or BLOCKING is decided at client and can not be changed any more. However, it is usually seen in batch jobs that a task is configured PIPELINED but its consumers can not be started due to lack of resource, this will lead to failover now. So we proposal to support deciding the result partition type dynamically during runtime according to the resource of cluster and so on to increase the success rate of batch job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11298) Scheduling job in the unit of concurrent groups
shuai.xu created FLINK-11298: Summary: Scheduling job in the unit of concurrent groups Key: FLINK-11298 URL: https://issues.apache.org/jira/browse/FLINK-11298 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Now flink only support two scheduling modes, that's scheduling all tasks Eager for streaming jobs and scheduling all task Lazy_from_source for batch jobs. This is not flexible enough for the various requirements of different job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based scheduling strategy which first split a job into serval concurrent groups and then schedule it in the unit of concurrent groups. This strategy will support not only the existing EAGER and LAZY_FROM_SOURCE mode but also other situation such as the Build/Probe in FLINK-10240. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout
shuai.xu created FLINK-11059: Summary: JobMaster may continue using an invalid slot if releasing idle slot meet a timeout Key: FLINK-11059 URL: https://issues.apache.org/jira/browse/FLINK-11059 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.7.0 Reporter: shuai.xu Assignee: shuai.xu When job master releases an idle slot to task executor, it may meet a timeout exception which cause that task executor may have already released the slot, but job master will add the slot back to available slots, and the slot may be used again. Then job master will continue deploying task to the slot, but task executor does not recognize it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak
shuai.xu created FLINK-9932: --- Summary: If task executor offer slot to job master timeout the first time, the slot will leak Key: FLINK-9932 URL: https://issues.apache.org/jira/browse/FLINK-9932 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu When task executor offer slot to job master, it will first mark the slot as active. If the offer slot call timeout, the task executor will try to call offerSlotsToJobManager again, but it will only offer the slot in ALLOCATED state. As the slot has already be mark ACTIVE, it will never be offered and this will cause slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9884) Slot request may not be removed when it has already be assigned in slot manager
shuai.xu created FLINK-9884: --- Summary: Slot request may not be removed when it has already be assigned in slot manager Key: FLINK-9884 URL: https://issues.apache.org/jira/browse/FLINK-9884 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu When task executor report a slotA with allocationId1, it may happen that slot manager record slotA is assigned to allocationId2, and the slot request with allocationId1 is not assigned. Then slot manager will update itself with slotA assigned to allocationId1, by it does not clear the slot request with allocationId1. For example: # There is one free slot in slot manager. # Now come two slot request with allocationId1 and allocationId2. # The slot is assigned to allocationId1, but the requestSlot call timeout. # SlotManager assign the slot to allocationId2 and insert a slot request with allocationId1. # The second requestSlot call to task executor return SlotOccupiedException. # SlotManager update the slot to allocationID1, but the slot request is left. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9828) Resource manager should recover slot resource status after failover
shuai.xu created FLINK-9828: --- Summary: Resource manager should recover slot resource status after failover Key: FLINK-9828 URL: https://issues.apache.org/jira/browse/FLINK-9828 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu After resource manager failover, task executors will report their slot allocation status to RM. But the report does not contain resource. So RM only know the slot are occupied but can not know how much resource is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9827) ResourceManager may receive outdate report of slots status from task manager
shuai.xu created FLINK-9827: --- Summary: ResourceManager may receive outdate report of slots status from task manager Key: FLINK-9827 URL: https://issues.apache.org/jira/browse/FLINK-9827 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu TaskExecutor will report its slot status to resource manager in heartbeat, but this is in a different thread with the main rpc thread. So it may happen that rm request a slot from task executor but then receive a heartbeat saying the slot not assigned. This will cause the slot be freed and assigned again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9826) Implement FLIP-6 YARN Resource Manager for SESSION mode
shuai.xu created FLINK-9826: --- Summary: Implement FLIP-6 YARN Resource Manager for SESSION mode Key: FLINK-9826 URL: https://issues.apache.org/jira/browse/FLINK-9826 Project: Flink Issue Type: New Feature Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu The Flink YARN Session Resource Manager communicates with YARN's Resource Manager to acquire and release containers. It will ask for N containers from YARN according to the config。 It is also responsible to notify the JobManager eagerly about container failures. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception
shuai.xu created FLINK-9632: --- Summary: SlotPool should notify the call when allocateSlot meet an exception Key: FLINK-9632 URL: https://issues.apache.org/jira/browse/FLINK-9632 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu In SlotPool, the allocateSlot() will return a CompletableFuture, but this future will only be completed when slotAndLocalityFuture return a LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
shuai.xu created FLINK-9293: --- Summary: SlotPool should check slot id when accepting a slot offer with existing allocation id Key: FLINK-9293 URL: https://issues.apache.org/jira/browse/FLINK-9293 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu For flip-6, there may be two or more slot assigned to the same slot allocation. For example, taskExecutor1 register, and assign allocationID1 to its slot1, but from taskExecutor1 side, the registeration timeout, and it register again, RM will fail the allocationID1 and assign slot2 on taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the allocationID1. Now slot pool just accept all the slot offer, and this may one slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8938) Not remove job graph during job master failover
shuai.xu created FLINK-8938: --- Summary: Not remove job graph during job master failover Key: FLINK-8938 URL: https://issues.apache.org/jira/browse/FLINK-8938 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu When job master failover, the new dispatcher should not delete the job graph if failed to start the job manager runner, or else the other dispatchers can not recover it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion
shuai.xu created FLINK-8442: --- Summary: Should recovery the input split when an execution failover with FailoverRegion Key: FLINK-8442 URL: https://issues.apache.org/jira/browse/FLINK-8442 Project: Flink Issue Type: Bug Components: JobManager, Scheduler Affects Versions: 1.4.0 Reporter: shuai.xu In flip-1, it enable only restart the executions in a FailoverRegion when a task fail. But now the input splits are assigned only when an ExecutionJobVertex is initializing, so when an executions restarts, the input splits it has read may can not be get from job master any more. Need to recover the input splits so they can be be consumed again when the task restarts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover
shuai.xu created FLINK-8434: --- Summary: The new yarn resource manager should take over the running task managers after failover Key: FLINK-8434 URL: https://issues.apache.org/jira/browse/FLINK-8434 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu The app master which container the job master and yarn resource manager may failover during running on yarn. The new resource manager should take over the running task managers after started. But now the YarnResourceManager does not record the running container to workerNodeMap, so when task managers register to it, it will reject them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
shuai.xu created FLINK-8399: --- Summary: Use independent configurations for the different timeouts in slot manager Key: FLINK-8399 URL: https://issues.apache.org/jira/browse/FLINK-8399 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
shuai.xu created FLINK-8289: --- Summary: The RestServerEndpoint should return the address with real ip when getRestAdddress Key: FLINK-8289 URL: https://issues.apache.org/jira/browse/FLINK-8289 Project: Flink Issue Type: Bug Reporter: shuai.xu Now when RestServerEndpoint.getRestAddress, it will return an address same with the value of config rest.address, the default it 127.0.0.1:9067, but this address can not be accessed from another machine. And the ip for Dispatcher and JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address will be registered to YARN or Mesos, but this address can not be accessed from another machine also. So it need to return the real ip:port for user to access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode
shuai.xu created FLINK-8288: --- Summary: Register the web interface url to yarn for yarn job mode Key: FLINK-8288 URL: https://issues.apache.org/jira/browse/FLINK-8288 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu For flip-6 job mode, the resource manager is created before the web monitor, so the web interface url is not set to resource manager, and the resource manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8266) Add network memory to ResourceProfile
shuai.xu created FLINK-8266: --- Summary: Add network memory to ResourceProfile Key: FLINK-8266 URL: https://issues.apache.org/jira/browse/FLINK-8266 Project: Flink Issue Type: Improvement Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu In task manager side, it should allocated the network buffer pool according to the input channel and output sub partition number, but when allocating a worker, the resource profile doesn't contain the information about these memory. So I suggest add a network memory filed to ResourceProfile and job master should calculate it when scheduling a task and then resource manager can allocating a container with the resource profile. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode
shuai.xu created FLINK-8224: --- Summary: Should shudownApplication when job terminated in job mode Key: FLINK-8224 URL: https://issues.apache.org/jira/browse/FLINK-8224 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu For job mode, one job is an application. When job finished, it should tell the resource manager to shutdown the application, otherwise the resource manager can not set the application status. For example, if yarn resource manager don't set application as finished to yarn master, the yarn will consider the application as still running. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots
shuai.xu created FLINK-7970: --- Summary: SlotPool support batch allocating slots Key: FLINK-7970 URL: https://issues.apache.org/jira/browse/FLINK-7970 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Assignee: shuai.xu Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7969) Resource manager support batch request slots
shuai.xu created FLINK-7969: --- Summary: Resource manager support batch request slots Key: FLINK-7969 URL: https://issues.apache.org/jira/browse/FLINK-7969 Project: Flink Issue Type: Improvement Components: ResourceManager Reporter: shuai.xu Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
shuai.xu created FLINK-7928: --- Summary: Extend the filed in ResourceProfile for precisely calculating the resource of a task manager Key: FLINK-7928 URL: https://issues.apache.org/jira/browse/FLINK-7928 Project: Flink Issue Type: Improvement Components: JobManager, ResourceManager Reporter: shuai.xu Assignee: shuai.xu ResourceProfile records all the resource requirements for a slot。It is generated by JobMaster and then passed to ResourceManager with the slot request. A task in the slot needs three parts of resource: 1. The resource for the operators, this is specified by the ResourceSpec user defined 2. The resource for the operators to communicating with their upstreams. For example, the resource for buffer pools and so on. 3. The resource for the operators to communicating with their downstreams. Same as above. So ResourceProfile should contain three parts of resource, the first part from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7878) Extend the resource type user can define in ResourceSpec
shuai.xu created FLINK-7878: --- Summary: Extend the resource type user can define in ResourceSpec Key: FLINK-7878 URL: https://issues.apache.org/jira/browse/FLINK-7878 Project: Flink Issue Type: Bug Components: DataSet API, DataStream API Reporter: shuai.xu Now, flink only support user define how much CPU and MEM used in an operator, but now the resource in a cluster is various. For example, an application for image processing may need GPU, some others may need FPGA. Only CPU and MEM is not enough, and the resource type is becoming more and more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7871) SlotPool should release its unused slot to RM
shuai.xu created FLINK-7871: --- Summary: SlotPool should release its unused slot to RM Key: FLINK-7871 URL: https://issues.apache.org/jira/browse/FLINK-7871 Project: Flink Issue Type: Bug Reporter: shuai.xu As described in design wiki https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, _*The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).*_ but now, the slot pool will keep the slots once offered to it until the job finished. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
shuai.xu created FLINK-7870: --- Summary: SlotPool should cancel the slot request to RM if not need any more. Key: FLINK-7870 URL: https://issues.apache.org/jira/browse/FLINK-7870 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu 1. SlotPool will request slot to rm if its slots are not enough. 2. If a slot request is not fulfilled in a certain time, SlotPool will treat the request as timeout and send a new slot request by triggering a failover in JobMaster, the previous request is not needed any more, but rm does not know it. 3. This may cause the rm request much more resource than the job really need. For example: 1. A job need 100 slots. RM request 100 container to YARN. 2. But YARN is busy now, it has no resource for the job. 3. The job failover as the resource request not fulfilled in time. 4. It ask 100 slots again, now RM request 200 container to YARN. 5. If failover server time, the containers request will become more and more. 6. Now YARN has resource, it will find that the job may need thousands of containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes
shuai.xu created FLINK-5869: --- Summary: ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes Key: FLINK-5869 URL: https://issues.apache.org/jira/browse/FLINK-5869 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: shuai.xu Assignee: shuai.xu Execution graph doesn't manage the failover of executions. It only care for the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or SUSPEND. For execution failure, it will notice the FailoverCoordinator to do failover. It only record the finished job vertex and changes to FINISHED after all vertexes finished. It will change to final fail if restart strategy fail or meet unrecoverable exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5868) Implement a new RestartStrategy that works for the FailoverRegion.
shuai.xu created FLINK-5868: --- Summary: Implement a new RestartStrategy that works for the FailoverRegion. Key: FLINK-5868 URL: https://issues.apache.org/jira/browse/FLINK-5868 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: shuai.xu Assignee: shuai.xu Will first implement a restart strategy base on failure rate. For simplicity, the new restart strategy only count the failover of FailoverRegion, for global failure of the execution graph, it will cause several FailoverRegion to failover, just count the total number. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5866) The implementation of FailoverRegion.
shuai.xu created FLINK-5866: --- Summary: The implementation of FailoverRegion. Key: FLINK-5866 URL: https://issues.apache.org/jira/browse/FLINK-5866 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: shuai.xu Assignee: shuai.xu In flip1, FailoverRegion will manage the failover of a minimal pipelined conncected executions. One job graph may has several FailoverRegions and they will be calculated statically after job graph is attached. FailoverRegion has serval states, CREATED, CANCELING, CANCELED, RUNNING, it will transforms its state according to the state of the ExectionVertexes it contains and drives the failover during state transformation. A FailoverRegion may have preceding and succeeding FailoverRegions. It will wait for its preceding ones and notice its succeeding ones when finishing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5856) Need return redundant containers to yarn for yarn mode
shuai.xu created FLINK-5856: --- Summary: Need return redundant containers to yarn for yarn mode Key: FLINK-5856 URL: https://issues.apache.org/jira/browse/FLINK-5856 Project: Flink Issue Type: Bug Components: YARN Reporter: shuai.xu Assignee: shuai.xu For flink on yarn mode, RM requests container from yarn according to the requirement of the JM. But the AMRMClientAsync used in yarn doesn't guarantee that the number of containers returned exactly equal to the number requested. So it need to record the number request by flink rm and return the redundant ones to yarn. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5791) Resource should be strictly matched when allocating for yarn
shuai.xu created FLINK-5791: --- Summary: Resource should be strictly matched when allocating for yarn Key: FLINK-5791 URL: https://issues.apache.org/jira/browse/FLINK-5791 Project: Flink Issue Type: Improvement Components: YARN Reporter: shuai.xu Assignee: shuai.xu For yarn mode, resource should be assigned as requested to avoid resource wasting and OOM. 1. YarnResourceManager will request container according to ResourceProfile in slot request form JM. 2. RM will pass the ResourceProfile to TM for initializing its slots. 3. RM should match the slots offered by TM with SlotRequest from JM strictly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5190) ZooKeeperLeaderRetrievalService should not close the zk client when stop
shuai.xu created FLINK-5190: --- Summary: ZooKeeperLeaderRetrievalService should not close the zk client when stop Key: FLINK-5190 URL: https://issues.apache.org/jira/browse/FLINK-5190 Project: Flink Issue Type: Bug Components: Core Reporter: shuai.xu Assignee: shuai.xu The zk client is created outside of ZooKeeperLeaderRetrievalService and psssed to it, when ZooKeeperLeaderRetrievalService stop, it should not stop the zk client as other may be using it outside. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner
shuai.xu created FLINK-5171: --- Summary: Wrong use of Preconditions.checkState in TaskManagerRunner Key: FLINK-5171 URL: https://issues.apache.org/jira/browse/FLINK-5171 Project: Flink Issue Type: Bug Components: Core Reporter: shuai.xu Assignee: shuai.xu Preconditions.checkState will check the first parameter is true, if not, it will throw an exception. but in TaskManagerRunner, it will throw an exception if rpc port is valid. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified
shuai.xu created FLINK-5170: --- Summary: getAkkaConfig will use localhost if hostname is specified Key: FLINK-5170 URL: https://issues.apache.org/jira/browse/FLINK-5170 Project: Flink Issue Type: Bug Components: Core Reporter: shuai.xu Assignee: shuai.xu in AkkaUtil.scala, def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = { getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) else None) } when hostname is specified, it use None. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper
shuai.xu created FLINK-5074: --- Summary: Implement a RunningJobRegistry based on Zookeeper Key: FLINK-5074 URL: https://issues.apache.org/jira/browse/FLINK-5074 Project: Flink Issue Type: Task Components: Cluster Management Reporter: shuai.xu For flip-6, it has implemented the ZookeeperHaServices, but ZookeeperHaServices does not support getRunningJobsRegistry. So need to implement a ZK based running job registry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4504) Support user to decide whether the result of an operator is presistent
shuai.xu created FLINK-4504: --- Summary: Support user to decide whether the result of an operator is presistent Key: FLINK-4504 URL: https://issues.apache.org/jira/browse/FLINK-4504 Project: Flink Issue Type: Sub-task Components: DataSet API Reporter: shuai.xu Support an api to user for deciding whether they need the result of an operator to be pipeline, spilled to local file or persisted to distribute file system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition
shuai.xu created FLINK-: --- Summary: Add a DFSInputChannel and DFSSubPartition Key: FLINK- URL: https://issues.apache.org/jira/browse/FLINK- Project: Flink Issue Type: Sub-task Components: Batch Connectors and Input/Output Formats Reporter: shuai.xu Add a new ResultPartitionType and ResultPartitionLocation type for DFS Add DFSSubpartition and DFSInputChannel for writing and reading DFS -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4419) Batch improvement for supporting dfs as a ResultPartitionType
shuai.xu created FLINK-4419: --- Summary: Batch improvement for supporting dfs as a ResultPartitionType Key: FLINK-4419 URL: https://issues.apache.org/jira/browse/FLINK-4419 Project: Flink Issue Type: Improvement Components: Batch Connectors and Input/Output Formats Reporter: shuai.xu This is the root issue to track a improvement for batch, which will enable dfs as a ResultPartitionType, so that upstream node can exist totally after finished and need not be restarted if downstream nodes fail. Full design is shown in (https://docs.google.com/document/d/15HtCtc9Gk8SyHsAezM7Od1opAHgnxLeHm3VX7A8fa-4/edit#). -- This message was sent by Atlassian JIRA (v6.3.4#6332)