[jira] [Created] (FLINK-16140) Translate "Event Processing (CEP)" page into Chinese

2020-02-18 Thread shuai.xu (Jira)
shuai.xu created FLINK-16140:


 Summary: Translate "Event Processing (CEP)" page into Chinese
 Key: FLINK-16140
 URL: https://issues.apache.org/jira/browse/FLINK-16140
 Project: Flink
  Issue Type: Task
  Components: chinese-translation, Documentation
Affects Versions: 1.10.0
Reporter: shuai.xu


Translate the internal page 
"[https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html]; 
into Chinese.

The doc located in "flink/docs/dev/libs/cep.zh.md"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16011) Normalize the within usage in Pattern

2020-02-11 Thread shuai.xu (Jira)
shuai.xu created FLINK-16011:


 Summary: Normalize the within usage in Pattern
 Key: FLINK-16011
 URL: https://issues.apache.org/jira/browse/FLINK-16011
 Project: Flink
  Issue Type: Improvement
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


In CEP, we can use Pattern.within() to set a window in which the pattern should 
be matched. However, the usage of within is ambiguous and confusing to user.

For example:
 # Pattern.begin("a").within(t1).followedBy("b").within(t2) will use the 
minimal of t1 and t2 as the window time for the whole pattern.
 # Pattern.begin("a").followedBy("b").within(t2) will use t2 as the window time.
 # But Pattern.begin("a").within(t1).followedBy("b") will have no window time
 # While 
Pattern.begin("a").notFollowedBy("not").within(t1).followedBy("b").within(t2) 
will use t2 as the window time.

So I propose to normalize the usage of within() and make strict checking when 
compiling the pattern. 

For example, we can only allow within() at the end of the pattern and point it 
out if user set it somewhere else when compiling the pattern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16010) Support notFollowedBy with interval as the last part of a Pattern

2020-02-11 Thread shuai.xu (Jira)
shuai.xu created FLINK-16010:


 Summary: Support notFollowedBy with interval as the last part of a 
Pattern
 Key: FLINK-16010
 URL: https://issues.apache.org/jira/browse/FLINK-16010
 Project: Flink
  Issue Type: New Feature
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


Now, Pattern.begin("a").notFollowedBy("b") is not allowed in CEP. But this a 
useful in many applications. Such as operators may want to find the users who 
created an order but didn't pay in 10 minutes.

So I propose to support that notFollowedBy() with a interval can be the last 
part of a Pattern. 

For example,  Pattern.begin("a").notFollowedBy("b").within(Time.minutes(10)) 
will be valid in the future.

Discuss in dev mail list is 
[https://lists.apache.org/thread.html/rc505728048663d672ad379578ac67d3219f6076986c80a2362802ebb%40%3Cdev.flink.apache.org%3E]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15980) The notFollowedBy in the end of GroupPattern may be ignored

2020-02-10 Thread shuai.xu (Jira)
shuai.xu created FLINK-15980:


 Summary: The notFollowedBy in the end of GroupPattern may be 
ignored
 Key: FLINK-15980
 URL: https://issues.apache.org/jira/browse/FLINK-15980
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


If we write a Pattern like this:
Pattern group = Pattern.begin('A').notFollowedBy("B");
Pattern pattern = Pattern.begin(group).followedBy("C");
Let notFollowedBy as the last part of a GroupPattern.

This pattern can be compile normally, but the notFollowedBy("B") doesn't work 
in fact.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15964) Getting previous stage in notFollowedBy may throw exception

2020-02-09 Thread shuai.xu (Jira)
shuai.xu created FLINK-15964:


 Summary: Getting previous stage in notFollowedBy may throw 
exception
 Key: FLINK-15964
 URL: https://issues.apache.org/jira/browse/FLINK-15964
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


In a notFollowedBy() condition, it may throw exception if trying to get value 
from previous stage for comparison.

For example:

Pattern pattern = Pattern.begin("start", 
AfterMatchSkipStrategy.skipPastLastEvent())
 .notFollowedBy("not").where(new IterativeCondition() {
 private static final long serialVersionUID = -4702359359303151881L;

 @Override
 public boolean filter(Event value, Context ctx) throws Exception {
 return 
value.getName().equals(ctx.getEventsForPattern("start").iterator().next().getName());
 }
 })
 .followedBy("middle").where(new IterativeCondition() {
 @Override
 public boolean filter(Event value, Context ctx) throws Exception {
 return value.getName().equals("b");
 }
 });

with inputs:
Event a = new Event(40, "a", 1.0);
Event b1 = new Event(41, "a", 2.0);
Event b2 = new Event(43, "b", 3.0);

It will throw 

org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
function.org.apache.flink.util.FlinkRuntimeException: Failure happened in 
filter function.
 at org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:698) at 
org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:628) at 
org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292) at 
org.apache.flink.cep.nfa.NFA.process(NFA.java:228) at 
org.apache.flink.cep.utils.NFATestHarness.consumeRecord(NFATestHarness.java:107)
 at 
org.apache.flink.cep.utils.NFATestHarness.feedRecord(NFATestHarness.java:84) at 
org.apache.flink.cep.utils.NFATestHarness.feedRecords(NFATestHarness.java:77) 
at org.apache.flink.cep.nfa.NFAITCase.testEndWithNotFollow(NFAITCase.java:2914) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused 
by: java.util.NoSuchElementException at 
java.util.Collections$EmptyIterator.next(Collections.java:4189) at 
org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2884) at 
org.apache.flink.cep.nfa.NFAITCase$154.filter(NFAITCase.java:2879) at 
org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:752) at 
org.apache.flink.cep.nfa.NFA.findFinalStateAfterProceed(NFA.java:688) ... 33 
more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15873) Matched result may not be output if existing earlier partial matches

2020-02-03 Thread shuai.xu (Jira)
shuai.xu created FLINK-15873:


 Summary: Matched result may not be output if existing earlier 
partial matches
 Key: FLINK-15873
 URL: https://issues.apache.org/jira/browse/FLINK-15873
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: shuai.xu


When running some cep jobs with skip strategy, I found that when we get a 
matched result, but if there is an earlier partial matches, the result will not 
be returned.

I think this is due to a bug in processMatchesAccordingToSkipStrategy() in NFA 
class. It should return matched result without judging whether this is partial 
matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15627) Correct the wrong naming of compareMaps in NFATestUtilities

2020-01-17 Thread shuai.xu (Jira)
shuai.xu created FLINK-15627:


 Summary: Correct the wrong naming of compareMaps in 
NFATestUtilities
 Key: FLINK-15627
 URL: https://issues.apache.org/jira/browse/FLINK-15627
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.1
Reporter: shuai.xu


The compareMaps in NFATestUtilities compare two lists in fact, so rename it to 
compareLists may be better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-12760) Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-06 Thread shuai.xu (JIRA)
shuai.xu created FLINK-12760:


 Summary: Implement ExecutionGraph to InputsLocationsRetriever 
Adapter
 Key: FLINK-12760
 URL: https://issues.apache.org/jira/browse/FLINK-12760
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: shuai.xu
Assignee: shuai.xu
 Fix For: 1.9.0


Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which 
adapts the ExecutionGraph to the InputsLocationsRetriever interface.

*Acceptance criteria*
 * The adapter always reflects an up to date view of the ExecutionGraph state



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed

2019-03-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11861:


 Summary: JobMasterTriggerSavepointIT case is not executed
 Key: FLINK-11861
 URL: https://issues.apache.org/jira/browse/FLINK-11861
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.2
Reporter: shuai.xu


The 
[JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java]
 will not be executed as the case name does not follow the style ***ITCase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-16 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11375:


 Summary: Concurrent modification to slot pool due to 
SlotSharingManager releaseSlot directly 
 Key: FLINK-11375
 URL: https://issues.apache.org/jira/browse/FLINK-11375
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.7.1
Reporter: shuai.xu


In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called throw 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11299) Decide the result partition type dynamically

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11299:


 Summary: Decide the result partition type dynamically 
 Key: FLINK-11299
 URL: https://issues.apache.org/jira/browse/FLINK-11299
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now, the result partition type is decided when compiling JobGraph at the 
client. This's to say, whether to output the result of a task to its consumers 
by PIPELINED or BLOCKING is decided at client and can not be changed any more. 
However, it is usually seen in batch jobs that a task is configured PIPELINED 
but its consumers can not be started due to lack of resource, this will lead to 
failover now. So we proposal to support deciding the result partition type 
dynamically during runtime according to the resource of cluster and so on to 
increase the success rate of batch job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11298:


 Summary: Scheduling job in the unit of concurrent groups
 Key: FLINK-11298
 URL: https://issues.apache.org/jira/browse/FLINK-11298
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now flink only support two scheduling modes, that's scheduling all tasks Eager 
for streaming jobs and scheduling all task Lazy_from_source for batch jobs. 
This is not flexible enough for the various requirements of different job such 
as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based scheduling 
strategy which first split a job into serval concurrent groups and then 
schedule it in the unit of concurrent groups. This strategy will support not 
only the existing EAGER and LAZY_FROM_SOURCE mode but also other situation such 
as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout

2018-12-03 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11059:


 Summary: JobMaster may continue using an invalid slot if releasing 
idle slot meet a timeout
 Key: FLINK-11059
 URL: https://issues.apache.org/jira/browse/FLINK-11059
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.7.0
Reporter: shuai.xu
Assignee: shuai.xu


When job master releases an idle slot to task executor, it may meet a timeout 
exception which cause that task executor may have already released the slot, 
but job master will add the slot back to available slots, and the slot may be 
used again. Then job master will continue deploying task to the slot, but task 
executor does not recognize it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak

2018-07-24 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9932:
---

 Summary: If task executor offer slot to job master timeout the 
first time, the slot will leak
 Key: FLINK-9932
 URL: https://issues.apache.org/jira/browse/FLINK-9932
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


When task executor offer slot to job master, it will first mark the slot as 
active.

If the offer slot call timeout, the task executor will try to call 
offerSlotsToJobManager again,

but it will only offer the slot in ALLOCATED state. As the slot has already be 
mark ACTIVE, it will never be offered and this will cause slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9884) Slot request may not be removed when it has already be assigned in slot manager

2018-07-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9884:
---

 Summary: Slot request may not be removed when it has already be 
assigned in slot manager
 Key: FLINK-9884
 URL: https://issues.apache.org/jira/browse/FLINK-9884
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


When task executor report a slotA with allocationId1, it may happen that slot 
manager record slotA is assigned to allocationId2, and the slot request with 
allocationId1 is not assigned. Then slot manager will update itself with slotA 
assigned to allocationId1, by it does not clear the slot request with 
allocationId1.

For example:
 # There is one free slot in slot manager.
 # Now come two slot request with allocationId1 and allocationId2.
 # The slot is assigned to allocationId1, but the requestSlot call timeout.
 # SlotManager assign the slot to allocationId2 and insert a slot request with 
allocationId1.
 # The second requestSlot call to task executor return SlotOccupiedException.
 # SlotManager update the slot to allocationID1, but the slot request is left.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9828) Resource manager should recover slot resource status after failover

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9828:
---

 Summary: Resource manager should recover slot resource status 
after failover
 Key: FLINK-9828
 URL: https://issues.apache.org/jira/browse/FLINK-9828
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu


After resource manager failover, task executors will report their slot 
allocation status to RM. But the report does not contain resource. So RM only 
know the slot are occupied but can not know how much resource is used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9827) ResourceManager may receive outdate report of slots status from task manager

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9827:
---

 Summary: ResourceManager may receive outdate report of slots 
status from task manager
 Key: FLINK-9827
 URL: https://issues.apache.org/jira/browse/FLINK-9827
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


TaskExecutor will report its slot status to resource manager in heartbeat, but 
this is in a different thread with the main rpc  thread. So it may happen that 
rm request a slot from task executor but then receive a heartbeat saying the 
slot not assigned. This will cause the slot be freed and assigned again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9826) Implement FLIP-6 YARN Resource Manager for SESSION mode

2018-07-11 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9826:
---

 Summary: Implement FLIP-6 YARN Resource Manager for SESSION mode
 Key: FLINK-9826
 URL: https://issues.apache.org/jira/browse/FLINK-9826
 Project: Flink
  Issue Type: New Feature
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


The Flink YARN Session Resource Manager communicates with YARN's Resource 
Manager to acquire and release containers. It will ask for N containers from 
YARN according to the config。

It is also responsible to notify the JobManager eagerly about container 
failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception

2018-06-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9632:
---

 Summary: SlotPool should notify the call when allocateSlot meet an 
exception
 Key: FLINK-9632
 URL: https://issues.apache.org/jira/browse/FLINK-9632
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In SlotPool, the allocateSlot() will return a CompletableFuture, 
but this future will only be completed when slotAndLocalityFuture return a 
LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9293:
---

 Summary: SlotPool should check slot id when accepting a slot offer 
with existing allocation id
 Key: FLINK-9293
 URL: https://issues.apache.org/jira/browse/FLINK-9293
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


For flip-6, there may be two or more slot assigned to the same slot allocation. 
For example, taskExecutor1 register, and assign allocationID1 to its slot1, but 
from taskExecutor1 side, the registeration timeout, and it register again, RM 
will fail the allocationID1 and assign slot2 on taskExecutor2 to it. but 
taskExecutor1 has already accept the allocationID1. 

So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
allocationID1. Now slot pool just accept all the slot offer, and this may one 
slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8938) Not remove job graph during job master failover

2018-03-13 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8938:
---

 Summary: Not  remove job graph during job master failover
 Key: FLINK-8938
 URL: https://issues.apache.org/jira/browse/FLINK-8938
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


When job master failover, the new dispatcher should not delete the job graph if 
failed to start the job manager runner, or else the other dispatchers can not 
recover it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion

2018-01-17 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8442:
---

 Summary: Should recovery the input split when an execution 
failover with FailoverRegion
 Key: FLINK-8442
 URL: https://issues.apache.org/jira/browse/FLINK-8442
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Scheduler
Affects Versions: 1.4.0
Reporter: shuai.xu


In flip-1, it enable only restart the executions in a FailoverRegion when a 
task fail. But now the input splits are assigned only when an 
ExecutionJobVertex is initializing, so when an executions restarts, the input 
splits it has read may can not be get from job master any more. Need to recover 
the input splits so they can be be consumed again when the task restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8434) The new yarn resource manager should take over the running task managers after failover

2018-01-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8434:
---

 Summary: The new yarn resource manager should take over the 
running task managers after failover
 Key: FLINK-8434
 URL: https://issues.apache.org/jira/browse/FLINK-8434
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


The app master which container the job master and yarn resource manager may 
failover during running on yarn. The new resource manager should take over the 
running task managers after started. But now the YarnResourceManager does not 
record the running container to 

workerNodeMap, so when task managers register to it, it will reject them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8399:
---

 Summary: Use independent configurations for the different timeouts 
in slot manager
 Key: FLINK-8399
 URL: https://issues.apache.org/jira/browse/FLINK-8399
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


There are three parameter in slot manager to indicate the timeout for slot 
request to task manager, slot request to be discarded and task manager to be 
released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need 
to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8289:
---

 Summary: The RestServerEndpoint should return the address with 
real ip when getRestAdddress
 Key: FLINK-8289
 URL: https://issues.apache.org/jira/browse/FLINK-8289
 Project: Flink
  Issue Type: Bug
Reporter: shuai.xu


Now when RestServerEndpoint.getRestAddress, it will return an address same with 
the value of config rest.address, the default it 127.0.0.1:9067, but this 
address can not be accessed from another machine. And the ip for Dispatcher and 
JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and 
the getRestAddress will return 0.0.0.0:9067, this address will be registered to 
YARN or Mesos, but this address can not be accessed from another machine also. 
So it need to return the real ip:port for user to access the web monitor 
anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8288:
---

 Summary: Register the web interface url to yarn for yarn job mode
 Key: FLINK-8288
 URL: https://issues.apache.org/jira/browse/FLINK-8288
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


For flip-6 job mode, the resource manager is created before the web monitor, so 
the web interface url is not set to resource manager, and the resource manager 
can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8266) Add network memory to ResourceProfile

2017-12-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8266:
---

 Summary: Add network memory to ResourceProfile
 Key: FLINK-8266
 URL: https://issues.apache.org/jira/browse/FLINK-8266
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In task manager side, it should allocated the network buffer pool according to 
the input channel and output sub partition number, but when allocating a 
worker, the resource profile doesn't contain the information about these 
memory. 
So I suggest add a network memory filed to ResourceProfile and job master 
should calculate it when scheduling a task and then resource manager can 
allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode

2017-12-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8224:
---

 Summary: Should shudownApplication when job terminated in job mode
 Key: FLINK-8224
 URL: https://issues.apache.org/jira/browse/FLINK-8224
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


For job mode, one job is an application. When job finished, it should tell the 
resource manager to shutdown the application, otherwise the resource manager 
can not set the application status. For example, if yarn resource manager don't 
set application as finished to yarn master, the yarn will consider the 
application as still running.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7970:
---

 Summary: SlotPool support batch allocating slots
 Key: FLINK-7970
 URL: https://issues.apache.org/jira/browse/FLINK-7970
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7969:
---

 Summary: Resource manager support batch request slots
 Key: FLINK-7969
 URL: https://issues.apache.org/jira/browse/FLINK-7969
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Reporter: shuai.xu


Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-10-25 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7928:
---

 Summary: Extend the filed in ResourceProfile for precisely 
calculating the resource of a task manager
 Key: FLINK-7928
 URL: https://issues.apache.org/jira/browse/FLINK-7928
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, ResourceManager
Reporter: shuai.xu
Assignee: shuai.xu


ResourceProfile records all the resource requirements for a slot。It is 
generated by JobMaster and then passed to ResourceManager with the slot 
request. 
A task in the slot needs three parts of resource: 
1. The resource for the operators, this is specified by the ResourceSpec user 
defined 
2. The resource for the operators to communicating with their upstreams. For 
example, the resource for buffer pools and so on.
3. The resource for the operators to communicating with their downstreams. Same 
as above.
So ResourceProfile should contain three parts of resource, the first part from 
ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7878:
---

 Summary: Extend the resource type user can define in ResourceSpec
 Key: FLINK-7878
 URL: https://issues.apache.org/jira/browse/FLINK-7878
 Project: Flink
  Issue Type: Bug
  Components: DataSet API, DataStream API
Reporter: shuai.xu


Now, flink only support user define how much CPU and MEM used in an operator, 
but now the resource in a cluster is various. For example, an application for 
image processing may need GPU, some others may need FPGA. 
Only CPU and MEM is not enough, and the resource type is becoming more and 
more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7871) SlotPool should release its unused slot to RM

2017-10-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7871:
---

 Summary: SlotPool should release its unused slot to RM
 Key: FLINK-7871
 URL: https://issues.apache.org/jira/browse/FLINK-7871
 Project: Flink
  Issue Type: Bug
Reporter: shuai.xu


As described in design wiki 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, 
_*The SlotPool releases slots that are unused to the ResourceManager. Slots 
count as unused if they are not used when the job is fully running (fully 
recovered).*_
but now, the slot pool will keep the slots once offered to it until the job 
finished.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.

2017-10-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7870:
---

 Summary: SlotPool should cancel the slot request to RM if not need 
any more.
 Key: FLINK-7870
 URL: https://issues.apache.org/jira/browse/FLINK-7870
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


1. SlotPool will request slot to rm if its slots are not enough.
2. If a slot request is not fulfilled in a certain time, SlotPool will treat 
the request as timeout and send a new slot request by triggering a failover in 
JobMaster, the previous request is not needed any more, but rm does not know it.
3. This may cause the rm request much more resource than the job really need.
For example:
1. A job need 100 slots. RM request 100 container to YARN.
2. But YARN is busy now, it has no resource for the job.
3. The job failover as the resource request not fulfilled in time.
4. It ask 100 slots again, now RM request 200 container to YARN.
5. If failover server time, the containers request  will become more and more.
6. Now YARN has resource, it will find that the job may need thousands of 
containers. This is a waste of resources.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes

2017-02-21 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5869:
---

 Summary: ExecutionGraph use FailoverCoordinator to manage the 
failover of execution vertexes
 Key: FLINK-5869
 URL: https://issues.apache.org/jira/browse/FLINK-5869
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


Execution graph doesn't manage the failover of executions. It only care for the 
state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or 
SUSPEND. 
For execution failure, it will notice the FailoverCoordinator to do failover.
It only record the finished job vertex and changes to FINISHED after all 
vertexes finished.
It will change to final fail if restart strategy fail or meet unrecoverable 
exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5868) Implement a new RestartStrategy that works for the FailoverRegion.

2017-02-21 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5868:
---

 Summary: Implement a new RestartStrategy that works for the 
FailoverRegion.
 Key: FLINK-5868
 URL: https://issues.apache.org/jira/browse/FLINK-5868
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


Will first implement a restart strategy base on failure rate. 
For simplicity, the new restart strategy only count the failover of 
FailoverRegion, for global failure of the execution graph, it will cause 
several FailoverRegion to failover, just count the total number.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5866) The implementation of FailoverRegion.

2017-02-21 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5866:
---

 Summary: The implementation of FailoverRegion.
 Key: FLINK-5866
 URL: https://issues.apache.org/jira/browse/FLINK-5866
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


In flip1, FailoverRegion will manage the failover of a minimal pipelined 
conncected executions.
One job graph may has several FailoverRegions and they will be calculated 
statically after job graph is attached.
FailoverRegion has serval states, CREATED, CANCELING, CANCELED, RUNNING, it 
will transforms its state according to the state of the ExectionVertexes it 
contains and drives the failover during state transformation.
A FailoverRegion may have preceding and succeeding FailoverRegions. It will 
wait for its preceding ones and notice its succeeding ones when finishing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5856) Need return redundant containers to yarn for yarn mode

2017-02-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5856:
---

 Summary: Need return redundant containers to yarn for yarn mode
 Key: FLINK-5856
 URL: https://issues.apache.org/jira/browse/FLINK-5856
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


For flink on yarn mode, RM requests container from yarn according to the 
requirement of the JM. But the AMRMClientAsync used in yarn doesn't guarantee 
that the number of containers returned exactly equal to the number requested. 
So it need to record the number request by flink rm and return the redundant 
ones to yarn.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5791) Resource should be strictly matched when allocating for yarn

2017-02-13 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5791:
---

 Summary: Resource should be strictly matched when allocating for 
yarn
 Key: FLINK-5791
 URL: https://issues.apache.org/jira/browse/FLINK-5791
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


For yarn mode, resource should be assigned as requested to avoid resource 
wasting and OOM.
1. YarnResourceManager will request container according to ResourceProfile   in 
slot request form JM.
2. RM will pass the ResourceProfile to TM for initializing its slots.
3. RM should match the slots offered by TM with SlotRequest from JM strictly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5190) ZooKeeperLeaderRetrievalService should not close the zk client when stop

2016-11-29 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5190:
---

 Summary: ZooKeeperLeaderRetrievalService should not close the zk 
client when stop
 Key: FLINK-5190
 URL: https://issues.apache.org/jira/browse/FLINK-5190
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: shuai.xu
Assignee: shuai.xu


The zk client is created outside of ZooKeeperLeaderRetrievalService and psssed 
to it, when ZooKeeperLeaderRetrievalService stop, it should not stop the zk 
client as other may be using it outside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner

2016-11-28 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5171:
---

 Summary: Wrong use of Preconditions.checkState in TaskManagerRunner
 Key: FLINK-5171
 URL: https://issues.apache.org/jira/browse/FLINK-5171
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: shuai.xu
Assignee: shuai.xu


Preconditions.checkState will check the first parameter is true, if not, it 
will throw an exception. but in TaskManagerRunner, it will throw an exception 
if rpc port is valid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified

2016-11-28 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5170:
---

 Summary: getAkkaConfig will use localhost if hostname is specified 
 Key: FLINK-5170
 URL: https://issues.apache.org/jira/browse/FLINK-5170
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: shuai.xu
Assignee: shuai.xu


in AkkaUtil.scala, 
def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): 
Config = {
getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
else None)
  }
when hostname is specified, it use None.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper

2016-11-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5074:
---

 Summary: Implement a RunningJobRegistry based on Zookeeper 
 Key: FLINK-5074
 URL: https://issues.apache.org/jira/browse/FLINK-5074
 Project: Flink
  Issue Type: Task
  Components: Cluster Management
Reporter: shuai.xu


For flip-6, it has implemented the ZookeeperHaServices, but ZookeeperHaServices 
does not support getRunningJobsRegistry. So need to implement a ZK based 
running job registry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4504) Support user to decide whether the result of an operator is presistent

2016-08-26 Thread shuai.xu (JIRA)
shuai.xu created FLINK-4504:
---

 Summary: Support user to decide whether the result of an operator 
is presistent
 Key: FLINK-4504
 URL: https://issues.apache.org/jira/browse/FLINK-4504
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API
Reporter: shuai.xu


Support an api to user for deciding whether they need the result of an operator 
to be pipeline, spilled to local file or persisted to distribute file system.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition

2016-08-22 Thread shuai.xu (JIRA)
shuai.xu created FLINK-:
---

 Summary: Add a DFSInputChannel and DFSSubPartition
 Key: FLINK-
 URL: https://issues.apache.org/jira/browse/FLINK-
 Project: Flink
  Issue Type: Sub-task
  Components: Batch Connectors and Input/Output Formats
Reporter: shuai.xu


Add a new ResultPartitionType and ResultPartitionLocation type for DFS
Add DFSSubpartition and DFSInputChannel for writing and reading DFS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4419) Batch improvement for supporting dfs as a ResultPartitionType

2016-08-17 Thread shuai.xu (JIRA)
shuai.xu created FLINK-4419:
---

 Summary: Batch improvement for supporting dfs as a 
ResultPartitionType
 Key: FLINK-4419
 URL: https://issues.apache.org/jira/browse/FLINK-4419
 Project: Flink
  Issue Type: Improvement
  Components: Batch Connectors and Input/Output Formats
Reporter: shuai.xu


This is the root issue to track a improvement for batch, which will enable dfs 
as a ResultPartitionType, so that upstream node can exist totally after 
finished and need not be restarted if downstream nodes fail.
Full design is shown in 
(https://docs.google.com/document/d/15HtCtc9Gk8SyHsAezM7Od1opAHgnxLeHm3VX7A8fa-4/edit#).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)