[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2021-05-31 Thread wangwj (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354777#comment-17354777
 ] 

wangwj commented on FLINK-10205:


[~Ryantaocer][~isunjin]
Hi, excuse me.
After I read this issue detailed, I have a question that in batch job although 
each inputsplit will be processed exactly once, but  when a task failover, it 
maybe not process the same inputsplit before failover after this patch merged.
Does this problem still exist?
I am working in speculative execution, so I want to discuess with you.
Thanks~


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: ryantaocer
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>   Original Estimate: 168h
>  Time Spent: 0.5h
>  Remaining Estimate: 167.5h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714953#comment-16714953
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240253142
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -590,6 +604,15 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
 
this.currentExecution = newExecution;
 
+   synchronized (this.inputSplits){
+   for (InputSplit split: 
this.inputSplits){
 
 Review comment:
   General formatting/style, this line (and others) lack some whitespaces, e.g. 
`split :` and `) {`. Please have another look at the formatting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714954#comment-16714954
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240247384
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -590,6 +604,15 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
 
this.currentExecution = newExecution;
 
+   synchronized (this.inputSplits){
+   for (InputSplit split: 
this.inputSplits){
+   if (split != null) {
+   
this.jobVertex.getSplitAssigner().returnInputSplit(split, 
this.getParallelSubtaskIndex());
+   }
+   }
+   this.inputSplits.clear();
 
 Review comment:
   I think in general the code-style in Flink avoids prefixing members with 
`this.` outside of constructors, getters, and setters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714956#comment-16714956
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240252512
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -590,6 +604,15 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
 
this.currentExecution = newExecution;
 
+   synchronized (this.inputSplits){
 
 Review comment:
   I wonder if the synchronization on `inputSplits` is even needed? In the 
failover case, `synchronized (priorExecutions)` makes failing exclusive and 
failing checks that the previous attempt is no longer running before 
manipulating the input splits. Getting the next split should then only be 
called from one attempt at a time, once its running and that should be from a 
single thread. Did I miss something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714958#comment-16714958
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240246446
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -590,6 +604,15 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
 
this.currentExecution = newExecution;
 
+   synchronized (this.inputSplits){
+   for (InputSplit split: 
this.inputSplits){
+   if (split != null) {
 
 Review comment:
   If we only add non-null splits, we could drop the check here. Would that 
make more sense?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714957#comment-16714957
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240246077
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -250,6 +255,15 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
+   this.inputSplits.add(nextInputSplit);
 
 Review comment:
   I would suggest to check for `null` here and only call add for non-null 
splits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714952#comment-16714952
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240248544
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -590,6 +604,15 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
 
this.currentExecution = newExecution;
 
+   synchronized (this.inputSplits){
+   for (InputSplit split: 
this.inputSplits){
+   if (split != null) {
+   
this.jobVertex.getSplitAssigner().returnInputSplit(split, 
this.getParallelSubtaskIndex());
 
 Review comment:
   From the usage, it seems like `returnInputSplit` should better be a bulk 
operation that takes a `Collection`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714951#comment-16714951
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240245877
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -105,6 +106,9 @@
/** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution;// this field must 
never be null
 
+   /** input split*/
+   private ArrayList inputSplits;
 
 Review comment:
   It seems like we can initialize this on construction and make the field 
`final`, in particular as it is used with `synchronized`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714955#comment-16714955
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on a change in pull request #6684: [FLINK-10205] 
Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r240258300
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
 ##
 @@ -38,4 +38,8 @@
 */
InputSplit getNextInputSplit(String host, int taskId);
 
+   /**
+* return the split to assigner if the task fail to process it.
 
 Review comment:
   Please add the parameter description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-11-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673434#comment-16673434
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r230445728
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
 
 Review comment:
   Thanks, removed.
   It was intent for support speculative execution, that multiple version of 
the same executionVertex can be run at same time, but i will postpone the 
feature and add it later


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672512#comment-16672512
 ] 

ASF GitHub Bot commented on FLINK-10205:


wenlong88 commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r230257155
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
 
 Review comment:
   there is no need to try to get input split from the input split history.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672420#comment-16672420
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-435235888
 
 
   Thanks @tillrohrmann and @StefanRRichter. I pushed the implementation of 
```return InputSplits to the InputSplitAssigner```. 
   
   The ```InputSplit``` will return to ```InputAssigner``` if the task fail to 
process it. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669867#comment-16669867
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-434629432
 
 
   +1 for exploring a way to return `InputSplits` to the `InputSplitAssigner`. 
The mix of concerns within `Executions` was also one of my main concerns, would 
be good if we find a way to avoid that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669861#comment-16669861
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-434628107
 
 
   Thanks for the explanation @isunjin. Now I understand that you don't think 
that it is strictly required to let a failed task process exactly the same 
`InputSplits` and that it is just a side effect of the current implementation.
   
   So in the end you've implemented it this way, because the 
`InputSplitAssigner` does not support returning `InputSplits`. Maybe that is 
something we should change. We could, for example, add a new interface which 
needs to be implemented by an `InputSplitAssigner` to support fine grained 
recovery. Otherwise, such a failure will result into a global failover.
   
   My concern is that by storing `InputSplits` in the `Executions` that we are 
mixing a bit of concerns. For example, assume that we have three tasks failing 
and we also lost a slot. Then we could only restart two of these tasks and need 
to distribute the slots of the third `Execution` among the newly started 
`Executions`. It would be much easier to simply return all slots to the 
`InputSplitAssigner` and let the newly started `Executions` pull from there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659711#comment-16659711
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r227137203
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -104,6 +105,9 @@
/** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution;// this field must 
never be null
 
+   /** input split*/
+   private ArrayList inputSplits;
 
 Review comment:
   good catch


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659655#comment-16659655
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431950619
 
 
   @tillrohrmann 
   
   >The thing I'm questioning is whether the InputSplits of the failed task 
need to be processed by the same (restarted) task or can be given to any 
running task.
   
   Agree. 
   
   I think failed task **doesn't** very necessary need to be processed by the 
same task (executionvertex).
   
   > So far I'm not convinced that something would break if we simply return 
the InputSplits to the InputSplitAssigner
   
   Agree. 
   
   i  think ```simply return the InputSplits to the InputSplitAssigner``` would 
work, the point is how to make it work.
   
   Restart the entier graph will call ExecutionJobVertex.resetForNewExecution 
which will create a new ```InputSplitAssigner``` and "return" all 
```InputSplits``` to ``` InputSplitAssigner```.
   
   My point is that for fine-grian failover, we might not want to return  all 
```InputSplits``` but just the failed ```InputSplits```.  However, currently 
not all subclass of InputSplitAssigner has the logic to ```simply return the 
InputSplits to the InputSplitAssigner```, such as 
```LocatableInputSplitAssigner``` or any other ```customized 
InputSplitAssigner```.
   
   ```simply return the InputSplits to the InputSplitAssigner``` also implies 
transaction between task and jobManager (maybe multiple one), we need to make 
sure the ```inputSplits``` get return to the ```InputSplitAssigner``` exactly 
once. what happened if we have speculative execution, which means two task 
consume the same set of InputSplits and but not fail at same time, does every 
InputSplitAssigner need to keep a list to deduplicate? what happened if the TM 
died or has network issue and InputSplit cannot be return?
   
   Save the ```InputSplits``` in executionVertex is a way to "return" it to ``` 
InputSplitAssigner```, the "side effect" of this implementation is that this 
also implies the ``` InputSplits``` will be handled by the same task 
(executionVertex). But this seams a simple and safe way to implement ```simply 
return the InputSplits to the InputSplitAssigner``` with transaction. 
   
   @tillrohrmann, the above is my understanding, let you know if we are on the 
same page. I would happy to redo this if you have any other suggestion. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659356#comment-16659356
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin removed a comment on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431734239
 
 
   Great discussion, thanks everybody. 
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail while process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that the current codebase doesn't has this logic and thus it has 
data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659354#comment-16659354
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann removed a comment on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431790382
 
 
   @isunjin I agree that the current implementation does not work with region 
failover. The thing I'm questioning is whether the `InputSplits` of the failed 
task need to be processed by the same (restarted) task or can be given to any 
running task. So far I'm not convinced that something would break if we simply 
return the `InputSplits` to the `InputSplitAssigner`. I think the `WordCount` 
example should work with this.
   
   Before hotfixing something in a way that might hurt us in the future, I 
would really like to grasp the full picture of why you want to solve the 
problem that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659358#comment-16659358
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin removed a comment on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431735667
 
 
   Great discussion, thanks everybody. 
   
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail to process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true, the 
_**DataSourceTask**_ will call _InputSplitAssigner_ to return _**InputSplit**_, 
depends on the implementation of _InputSplitAssigner_, the failed 
_**InputSplit**_ might be discard,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that _**LocatableInputSplitAssigner**_ will discard failed 
_**InputSplit**_  and thus it has data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   @tillrohrmann, the problem here is that this is a bug, so should we hotfix 
it instead of waiting new feature available.  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659357#comment-16659357
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin removed a comment on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431734273
 
 
   Great discussion, thanks everybody. 
   
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail while process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that the current codebase doesn't has this logic and thus it has 
data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658685#comment-16658685
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431757055
 
 
   @isunjin I agree that the current implementation does not work with region 
failover. The thing I'm questioning is whether the `InputSplits` of the failed 
task need to be processed by the same (restarted) task or can be given to any 
running task. So far I'm not convinced that something would break if we simply 
return the `InputSplits` to the `InputSplitAssigner`. I think the `WordCount` 
example should work with this.
   
   Before hotfixing something in a way that might hurt us in the future, I 
would really like to grasp the full picture of why you want to solve the 
problem that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658686#comment-16658686
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431757163
 
 
   @isunjin I agree that the current implementation does not work with region 
failover. The thing I'm questioning is whether the `InputSplits` of the failed 
task need to be processed by the same (restarted) task or can be given to any 
running task. So far I'm not convinced that something would break if we simply 
return the `InputSplits` to the `InputSplitAssigner`. I think the `WordCount` 
example should work with this.
   
   Before hotfixing something in a way that might hurt us in the future, I 
would really like to grasp the full picture of why you want to solve the 
problem that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658684#comment-16658684
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431756989
 
 
   @isunjin I agree that the current implementation does not work with region 
failover. The thing I'm questioning is whether the `InputSplits` of the failed 
task need to be processed by the same (restarted) task or can be given to any 
running task. So far I'm not convinced that something would break if we simply 
return the `InputSplits` to the `InputSplitAssigner`. I think the `WordCount` 
example should work with this.
   
   Before hotfixing something in a way that might hurt us in the future, I 
would really like to grasp the full picture of why you want to solve the 
problem that way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658536#comment-16658536
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431735693
 
 
   Great discussion, thanks everybody. 
   
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail to process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true, the 
_**DataSourceTask**_ will call _InputSplitAssigner_ to return _**InputSplit**_, 
depends on the implementation of _InputSplitAssigner_, the failed 
_**InputSplit**_ might be discard,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that _**LocatableInputSplitAssigner**_ will discard failed 
_**InputSplit**_  and thus it has data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   @tillrohrmann, the problem here is that this is a bug, so should we hotfix 
it instead of waiting new feature available.  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658535#comment-16658535
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431735667
 
 
   Great discussion, thanks everybody. 
   
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail to process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true, the 
_**DataSourceTask**_ will call _InputSplitAssigner_ to return _**InputSplit**_, 
depends on the implementation of _InputSplitAssigner_, the failed 
_**InputSplit**_ might be discard,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that _**LocatableInputSplitAssigner**_ will discard failed 
_**InputSplit**_  and thus it has data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   @tillrohrmann, the problem here is that this is a bug, so should we hotfix 
it instead of waiting new feature available.  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658516#comment-16658516
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431734239
 
 
   Great discussion, thanks everybody. 
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail while process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that the current codebase doesn't has this logic and thus it has 
data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658517#comment-16658517
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431734273
 
 
   Great discussion, thanks everybody. 
   
   @wenlong88, the scenario you mention is what i try to fix.  
[here](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is a concrete example, a simple word count job will have data inconsistent 
while failover, the job should fail but success with zero output.
   
   @tillrohrmann, **_InputSplitAssigner_** generate a list of _**InputSplit**_, 
the order might not matter, but every input should be proceed exactly once, if 
a task fail while process a _**InputSplit**_, this _**InputSplit**_ should be 
processed again, however, in batch scenario, it might not true,  
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 repro shows that the current codebase doesn't has this logic and thus it has 
data inconsistent issue.
   
   Its not a problem in Streaming scenario, as the  _**InputSplit**_ will be 
treat as a record, eg: in _**ContinuousFileMonitoringFunction**_, it will 
collect  _**InputSplit**_ and every  _**InputSplit**_ will be guaranteed 
process exactly once by FLINK, @wenlong88 will this work in your scenario? 
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657704#comment-16657704
 ] 

ASF GitHub Bot commented on FLINK-10205:


TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431546174
 
 
   FYI there was another 
[discussion(FLINK-10038)](https://issues.apache.org/jira/browse/FLINK-10038) on 
`InputSplit` (creation and) assignment.
   I prefer the idea to move the whole `InputSplit` assignment into a single 
task. It could, get splits auto checkpointed and simplify the currently complex 
JM. However, new nodes might introduce unexpected feature to the graph, and 
besides we have to implement some "pull" semantic for splits nodes.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653673#comment-16653673
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796
 
 
   Alright, now the problem is a bit clearer to me. The underlying problem is 
that the `InputSplitAssigner's` semantics in case of a failover are not well 
defined. This is mainly due to the fact that Flink evolved over time.
   
   The general idea of the `InputSplitAssigner` is to lazily assign work to 
sources which have completely consumed their current `InputSplit`. The order in 
which this happens should not affect the correctness of the result.
   
   If you say that in case of a recovery the exact same `InputSplit` assignment 
needs to happen again, then I think it must be because our sources have some 
kind of state. Otherwise, it should not matter which source task completes the 
`InputSplit`, right? If this is correct, then we would run into the same 
problem if a JM failure happens, because we would lose all `InputSplit` 
assignment information which is stored on the JM. So stateful sources with 
`InputSplits` don't work at the moment (in the general case).
   
   If we assume that our sources are stateless, then simply returning the input 
splits to the assigner and letting the next idling task take it should work. In 
your example of the infinite stream which is initialized via the `InputSplits` 
there would be no other task competing for the `InputSplit` of a failed task 
because by definition they never finish their work, right? If multiple tasks 
fail, then the mapping might be different after the recovery, but every task 
would continue consuming from a single `InputSplit`. I think the problem here 
is that you abused the `InputSplitAssigner` for something it is not yet 
intended to do.
   
   The reason why I'm a bit hesitant here is because I think we do not fully 
understand yet what we actually want to have. Moreover, some corner cases not 
clear to me yet. For example, why would it be ok for a global failover to 
change the mapping and not for region failover? Another example is how to 
handle the case where we lose a TM and need to downscale. Would that 
effectively be a global failover where we redistribute all `InputSplits` (I 
would think so). 
   
   Before starting any concrete implementation steps, I think we should 
properly design this feature to get it right. A very related topic is actually 
the new source interface. Depending on how much we are able to unify batch and 
streaming, the whole `InputSplit` assignment might move into a single task 
(similar to the `ContinuousFileMonitoringSink`) and the assignment might become 
part of a checkpoint. That way, we would no longer need to take care of this on 
the JM side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653181#comment-16653181
 ] 

ASF GitHub Bot commented on FLINK-10205:


shuai-xu commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r225825945
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -104,6 +105,9 @@
/** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution;// this field must 
never be null
 
+   /** input split*/
+   private ArrayList inputSplits;
 
 Review comment:
   It seems the inputSplits will never be cleared. In fact, it should be 
cleared during a global failover.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653159#comment-16653159
 ] 

ASF GitHub Bot commented on FLINK-10205:


wenlong88 commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430529308
 
 
   A real use case: we implements a source function of infinite stream job 
which use input format to initialize input splits (so that we can just visit 
the meta data of storage once) and assign input split to every task. the input 
split will never finish and we need to make sure that all of task process the 
same number of input split for load balance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653143#comment-16653143
 ] 

ASF GitHub Bot commented on FLINK-10205:


wenlong88 commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430524769
 
 
   @tillrohrmann I think in most cases, it is ok the split on failed task taken 
over by other tasks and it is more friendly for failover.
   But for some special cases,  it is not ok, because the assigner can be 
customized, all of the assignment of input split should comes from the assigner 
implemented. 
   eg: when the assigner implemented to make sure each task processes the same 
number of input splits, we should not allow the input splits of failed task 
taken over by other task.
   
   Maybe it is better to add a new interface for assigner to do fine grained 
failover ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653131#comment-16653131
 ] 

ASF GitHub Bot commented on FLINK-10205:


shuai-xu commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430522120
 
 
   I opened a same jira https://issues.apache.org/jira/browse/FLINK-8442 long 
ago, Stephan had two proposal then. One is to make input split checkpointed in 
job master, the other is split source into two tasks, one task manages the 
input splits. @StephanEwen , is there any update?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653116#comment-16653116
 ] 

ASF GitHub Bot commented on FLINK-10205:


TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430519357
 
 
   @tillrohrmann I would recur what @isunjin and @wenlong88 emphasize:
   
   > @wenlong88: the framework do not know how the input assigner assign splits 
to subtask and we can't reconstruct the assigner in region failover
   > @isunjin: otherwise the logic to make data consistent is complicated
   
   For implementation, we can record the input split assigned to the task on 
assigning, and this is the most general way. We can, conceptually, think as if 
the input split be processed by any one task, but technically, @isunjin 's 
implementation is a concise one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653060#comment-16653060
 ] 

ASF GitHub Bot commented on FLINK-10205:


tillrohrmann commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430508466
 
 
   Before moving forward I'd like to understand why it is strictly necessary 
that the failed tasks reprocesses the same set of input splits. Is it because 
streaming sources can have state which they would use to filter out already 
processed splits? 
   
   In the batch case, this should not be a problem because it should not matter 
which tasks processes which input split. If a failure occurs and some other 
task takes over the failed input splits, it would as if this task had processed 
these input splits from the very beginning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652868#comment-16652868
 ] 

ASF GitHub Bot commented on FLINK-10205:


wenlong88 commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430467136
 
 
   hi, @tillrohrmann @isunjin , we have encountered the same issue in region 
failover, and fix with a similar patch. Restricting the splits to the failed 
task is necessary, because the framework do not know how the input assigner 
assign splits to subtask and we can't reconstruct the assigner in region 
failover, so keep the assignment not changed is the most general way to fix the 
issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652761#comment-16652761
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430451645
 
 
   @tillrohrmann i think the splits need be strictly consumed by failed job. 
otherwise the logic to make data consistent (make sure split be consumed 
exactly once) is complicated. Streaming is good at this as it internally treat 
split as a record, the framework will make sure every split (record) be 
processed exactly once.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639654#comment-16639654
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-427323755
 
 
   @isunjin Ok, I think I was just connecting a different type of problem with 
the word "corruption", but the case makes sense because the assumption so far 
was probably that there are only global failover and that the 
`InputSplitAssigner` is reset. I think in this context the implementation is 
ok, I still not the biggest fan of pulling the `InputSplit` concept into 
`Execution` but at the same time I see that currently all alternatives that 
came to my mind have their own set of problems. Guess it would be ok for me to 
merge the PR, but I would like to ask @tillrohrmann for a second opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638685#comment-16638685
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-427127694
 
 
   @StefanRRichter thanks for comments
   - for the inconsistent issue, 
[this](https://github.com/isunjin/flink/commit/b61b58d963ea11d34e2eb7ec6f4fe4bfed4dca4a)
 is the repro, the logic is simple, we throw a exception in the wordcount 
example and use restartRegion as the failover strategy, the job was expected to 
fail, but succeed with incorrect result. the reason is that while restart, it 
will call requestNextSplit, it will return empty as the the split was drained 
to empty, since its empty, flatMap method will not get executed and exception 
will not throw.
   
   - the goal for the general approach is to make sure we have the assumption 
"deterministic behavior" as much as possible, as deterministic is crucial for 
failover. the code is not target for introduce "deterministic" for 
DataSourceTask, right now DataSourceTask is only used for batch scenario . For 
streaming scenario, it will work once we treat the splitIndex as state.
   
   - for the load balance, i think the first priority is make data consistent, 
we can certainly add more logic to make it more efficient.   
   
   - Thanks for let me know this, however, this is a bug right now, actually 
block me moving forward, we can refactor this code if we have a fundamental 
different design. 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638449#comment-16638449
 ] 

ASF GitHub Bot commented on FLINK-10205:


StefanRRichter commented on issue #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-427078004
 
 
   @isunjin in the issue/design doc you are talking about potential data 
inconsistency/corruption that this PR is trying to fix. However, I wonder what 
sort of corruption you have in mind that is fixed here. Can you provide a 
concrete example of a problematic case? In my understanding, graph components 
are either connected and need a connected restart or they are independent and 
can recover fine-grained but then then it should also not matter in which order 
splits are reprocessed.
   
   Besides that, I wonder if the general approach is a good fit for the current 
and future architecture of this component. In particular, we pull the concern 
of `InputSplit` down to the level of `Executions`. `Execution` or 
`ExecutionJobVertex` are used in batch and streaming and to me it does not seem 
like a good step to introduce batch-specific code into those classes if we can 
avoid it. Another thing that I question here is if it would not make sense to 
think about a way that allows us also to release the assignment from an input 
split to a certain task, so that another task can pick it up in case that there 
is a longer lasting problem with the original task. Last, we are currently 
thinking about a general redesign of the source interface and how input is 
assigned to the source instances. @aljoscha has a WIP branch to experiment with 
the possible changes here 
https://github.com/aljoscha/flink/tree/refactor-source-interface, but we should 
keep in mind that sources might be split into two operators in the future.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.7.0, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632583#comment-16632583
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-425576301
 
 
   @fhueske @StephanEwen could you help to have a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632580#comment-16632580
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221389763
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -704,6 +716,66 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   private JobGraph createDataSourceJobGraph() {
+   final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+   final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+   new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper>(inputFormat));
+   producer.setInvokableClass(DataSourceTask.class);
+
+   final JobVertex consumer = new JobVertex("Consumer");
+   consumer.setInvokableClass(NoOpInvokable.class);
+   consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+   final JobGraph jobGraph = new JobGraph(producer, consumer);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   return jobGraph;
+   }
+
+   /**
+* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+* validate that it will get same result for a different retry
+*/
+   @Test
+   public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+   final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+   testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, 
jobMaster) ->{
+   try{
+   final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+   
.deserializeValue(getClass().getClassLoader());
+   JobVertexID vertexID = 
taskInformation.getJobVertexId();
+
+   //get the previous split
+   SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+   //start a new version of this execution
+   ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+   Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+   ExecutionVertex executionVertex = 
execution.getVertex();
+
+   long version = execution.getGlobalModVersion();
+   gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FINISHED)).get();
+   Execution newExecution = 
executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+   //get the new split
+   SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+   
Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+   //get the new split3
 
 Review comment:
   add more assert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  

[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630888#comment-16630888
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221036186
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   Ok, I think there shouldn't be too much re-computation overhead, since it 
happens once per attempt. I am fine with either way. Thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630841#comment-16630841
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221024426
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   put null here is to avoid recomputing, otherwise if there is another 
execution attempt pull getNextSplit we need to recompute. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630816#comment-16630816
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on a change in pull request #6684: [FLINK-10205] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221019265
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   this is actually a refactor of original code (see line 577 of 
JobMaster.java), we keep the logic same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630761#comment-16630761
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221002876
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
 
+   public InputSplit getNextInputSplit(int index, String host) {
+   final int taskId = this.getParallelSubtaskIndex();
+   synchronized (this.inputSplits) {
+   if (index < this.inputSplits.size()) {
+   return this.inputSplits.get(index);
+   } else {
+   final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
 
 Review comment:
   nit: When all input splits are exhausted for given vertex (nextInputSplit is 
null), you can just return null without adding an extra null element at the end 
of array list.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630760#comment-16630760
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221007252
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -704,6 +716,66 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
 
+   private JobGraph createDataSourceJobGraph() {
+   final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+   final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+   new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper>(inputFormat));
+   producer.setInvokableClass(DataSourceTask.class);
+
+   final JobVertex consumer = new JobVertex("Consumer");
+   consumer.setInvokableClass(NoOpInvokable.class);
+   consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+   final JobGraph jobGraph = new JobGraph(producer, consumer);
+   jobGraph.setAllowQueuedScheduling(true);
+
+   return jobGraph;
+   }
+
+   /**
+* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+* validate that it will get same result for a different retry
+*/
+   @Test
+   public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+   final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+   testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, 
jobMaster) ->{
+   try{
+   final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+   
.deserializeValue(getClass().getClassLoader());
+   JobVertexID vertexID = 
taskInformation.getJobVertexId();
+
+   //get the previous split
+   SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+   //start a new version of this execution
+   ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+   Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+   ExecutionVertex executionVertex = 
execution.getVertex();
+
+   long version = execution.getGlobalModVersion();
+   gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FINISHED)).get();
+   Execution newExecution = 
executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+   //get the new split
+   SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+   
Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+   //get the new split3
 
 Review comment:
   Make sure you cover the case where input splits are exhausted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> 

[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630759#comment-16630759
 ] 

ASF GitHub Bot commented on FLINK-10205:


xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#discussion_r221001196
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}
 
+   public InputSplit getNextInputSplit() {
+   final LogicalSlot slot = this.getAssignedResource();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
 
 Review comment:
   Under which condition, slot will be null? If slot is null, what does 
vertex.getNextInputSplit() returns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628084#comment-16628084
 ] 

ASF GitHub Bot commented on FLINK-10205:


chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286
 
 
   Nice work. Just that the stored history of split 'inputSplits' does not have 
to be a full history. If we understand that the list is short enough, then it 
is fine to ship it as is. +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-20 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622387#comment-16622387
 ] 

JIN SUN commented on FLINK-10205:
-

Hi Stephan,

I have the FLIP, and the pull request ready for this issue, could you help or 
ask somebody have a look?

https://github.com/apache/flink/pull/6684 


The FLIP is here:

https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing
 


Jin



> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616958#comment-16616958
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-421858786
 
 
   I have put the the document here, note that the document not only include 
this issue, but includes other failover improvements:
   
https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612093#comment-16612093
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin opened a new pull request #6684: [FLINK-10205] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684
 
 
   ## What is the purpose of the change
   
   Today DataSource Task pull InputSplits from JobManager to achieve better
   performance, however, when a DataSourceTask failed and rerun, it will
   not get the same splits as its previous version. this will introduce
   inconsistent result or even data corruption.
   
   Furthermore,  if there are two executions run at the same time (in batch
   scenario), this two executions should process same splits.
   
   we need to fix the issue to make the inputs of a DataSourceTask
   deterministic. The propose is save all splits into ExecutionVertex and
   DataSourceTask will pull split from there.
   
   
   ## Brief change log
   
 - *JobMaster getNextInputSplit from Execution*
 - *Execution forward getNextInputSplit and the sequence number of the 
request to ExecutionVertex*
 - *If the sequence number exist in the ExecutionVertex return, else 
calculate and cache*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *covered by existing test*
 - *Added a new test that validates the scenario that getNextInputSplit 
multiple times with different Execution attempts per ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612045#comment-16612045
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin closed pull request #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 98054e94224..0c0b0dd2ffb 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,11 +7,21 @@
 
 
 
+
+metrics.latency.granularity
+"operator"
+Defines the granularity of latency metrics. Accepted values 
are:single - Track latency without differentiating between sources and 
subtasks.operator - Track latency while differentiating between 
sources, but not subtasks.subtask - Track latency while 
differentiating between sources and subtasks.
+
 
 metrics.latency.history-size
 128
 Defines the number of measured latencies to maintain at each 
operator.
 
+
+metrics.latency.interval
+0
+Defines the interval at which latency tracking marks are 
emitted from the sources. Disables latency tracking if set to 0 or a negative 
value. Enabling this feature can significantly impact the performance of the 
cluster.
+
 
 metrics.reporter.name.parameter
 (none)
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index c624fce8954..d0043647227 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -592,7 +592,7 @@ val output: DataSet[(Int, String, Double)] = 
input.sum(0).min(2)
   
 
 
-
+
   Join
   
 Joins two data sets by creating all pairs of elements that are equal 
on their keys.
@@ -608,7 +608,7 @@ val result = input1.join(input2).where(0).equalTo(1)
 describe whether the join happens through partitioning or 
broadcasting, and whether it uses
 a sort-based or a hash-based algorithm. Please refer to the
 Transformations 
Guide for
-a list of possible hints and an example.
+a list of possible hints and an example.
 If no hint is specified, the system will try to make an estimate of 
the input sizes and
 pick the best strategy according to those estimates.
 {% highlight scala %}
@@ -700,7 +700,6 @@ val result = in.partitionByRange(0).mapPartition { ... }
 {% endhighlight %}
   
 
-
 
   Custom Partitioning
   
@@ -1615,7 +1614,7 @@ In object-reuse enabled mode, Flink's runtime minimizes 
the number of object ins

   Emitting Input Objects
   
-You must not emit input objects, except for input 
objects of MapFunction, FlatMapFunction, MapPartitionFunction, 
GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and 
InputFormat.next(reuse).
+You must not emit input objects, except for input 
objects of MapFunction, FlatMapFunction, MapPartitionFunction, 
GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and 
InputFormat.next(reuse).
   


diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 7d88a36393c..85c60a67a22 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the 
startup.
 
 ## Latency tracking
 
-Flink allows to track the latency of records traveling through the system. To 
enable the latency tracking
-a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+Flink allows to track the latency of records traveling through the system. 
This feature is disabled by default.
+To enable the latency tracking you must set the `latencyTrackingInterval` to a 
positive number in either the
+[Flink configuration]({{ site.baseurl 
}}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`.
 
 At the `latencyTrackingInterval`, the sources will periodically emit a special 
record, called a `LatencyMarker`.
 The marker contains a timestamp from the time when the record has been emitted 
at the sources.
@@ -1659,6 +1660,9 @@ latency issues caused by individual machines.
 Currently, Flink assumes that the clocks of all machines in the cluster are in 
sync. We recommend setting
 up an automated clock synchronisation service (like NTP) to avoid false 
latency results.
 
+Warning Enabling latency 

[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605106#comment-16605106
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657#issuecomment-418925630
 
 
   I have put the the document here, note that the document not only include 
this issue, but includes other failover improvements: 
   
[https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing](https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing
 )


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604577#comment-16604577
 ] 

ASF GitHub Bot commented on FLINK-10205:


yanghua commented on issue #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657#issuecomment-418776835
 
 
   @isunjin It seems that you don't need to have permission to edit the wiki 
right away. You can write the document through Google Doc and give others 
access permission, and then others can discuss and review it. When the 
community feels that the design document is ok, it is not too late to edit the 
wiki. @zentol right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604402#comment-16604402
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin commented on issue #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657#issuecomment-418730339
 
 
   I have my document ready, but I don’t have permission to edit the wiki, 
Stephan, could you grant me the permission?
   
   
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 
 
   
   Jin
   
   > On Sep 5, 2018, at 8:47 PM, Chesnay Schepler  
wrote:
   > 
   > In the JIRA a design document was explicitly requested. Is this accessible 
anywhere?
   > 
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub 
, or mute the 
thread 
.
   > 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604351#comment-16604351
 ] 

ASF GitHub Bot commented on FLINK-10205:


zentol commented on issue #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657#issuecomment-418717042
 
 
   @isunjin Please fill out the PR template.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604353#comment-16604353
 ] 

ASF GitHub Bot commented on FLINK-10205:


zentol commented on issue #6657: [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657#issuecomment-418717373
 
 
   In the JIRA a design document was explicitly requested. Is this accessible 
anywhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603867#comment-16603867
 ] 

ASF GitHub Bot commented on FLINK-10205:


isunjin opened a new pull request #6657: [FLINK-10205] [JobManager] Batch 
Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657
 
 
   …Task
   
   Today DataSource Task pull InputSplits from JobManager to achieve better
   performance, however, when a DataSourceTask failed and rerun, it will
   not get the same splits as its previous version. this will introduce
   inconsistent result or even data corruption.
   
   Furthermore,  if there are two executions run at the same time (in batch
   scenario), this two executions should process same splits.
   
   we need to fix the issue to make the inputs of a DataSourceTask
   deterministic. The propose is save all splits into ExecutionVertex and
   DataSourceTask will pull split from there.
   
   Change-Id: Ifd9ac2639d0a3a919269552dbd3b7f0b689f1c25
   
   
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - 

[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-26 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593082#comment-16593082
 ] 

JIN SUN commented on FLINK-10205:
-

Great Stephan, i actually has the same idea, for the batch failover, there are 
a bunch of things we need to consider and discuss. I can prepare a FLIP. 

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-25 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592589#comment-16592589
 ] 

Stephan Ewen commented on FLINK-10205:
--

I think for changes to batch recovery, there should be a FLIP / design document 
.
Without this design doc describing the overall approach and design, it is not 
possible for reviewers to understand the necessity, importance, ad implication 
of individual changes.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591489#comment-16591489
 ] 

JIN SUN commented on FLINK-10205:
-

In my opinion we need deterministic, especially when a task rerun, the output 
should as same as the previous version. Today's logic might skip the split that 
previous task processed and assign a different split, this will lead incorrect 
result.

Fabian, we didn't restrict the InputSplit assignment, instead, by add a small 
piece of code in Execution.java and ExecutionVertex.java, we can make it 
deterministic.

i'm preparing the code, we can have a further discussion when it ready. 

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591484#comment-16591484
 ] 

Fabian Hueske commented on FLINK-10205:
---

Oh, I didn't notice that this issue was created in the context of FLINK-4256.

Please disregard my previous comment.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591472#comment-16591472
 ] 

JIN SUN commented on FLINK-10205:
-

Thanks Vinoyang,

I cannot assign the bug to myself, I've send a email to Aljoscha Krettek ask 
for permission. 

Jin




> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591251#comment-16591251
 ] 

Fabian Hueske commented on FLINK-10205:
---

Most jobs are implemented in a way that the split assignment does not affect 
the semantics of the job. 
I don't think we should restrict input split assignment to make the small set 
of jobs with non-deterministic logic behave deterministic.
Also, there are a few more sources of non-determinism (order, partitioning) 
that would need to be removed. The orchestration overhead to make all 
operations behave determistically is too high.

You can implement a custom 
[InputSplitAssigner|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java]
 if you want to have deterministic input split assignment.

I would close this issue as "Won't Fix"


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591218#comment-16591218
 ] 

vinoyang commented on FLINK-10205:
--

[~isunjin] Can you assign this issue to yourself? If not, you may need to apply 
for the Contributor permission first.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-23 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591066#comment-16591066
 ] 

JIN SUN commented on FLINK-10205:
-

I would like to work on this issue

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)