[jira] [Closed] (FLINK-12835) Time conversion is wrong in ManualClock

2019-06-14 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-12835.
--

> Time conversion is wrong in ManualClock
> ---
>
> Key: FLINK-12835
> URL: https://issues.apache.org/jira/browse/FLINK-12835
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> `currentTime` stored in ManualClock is nanoseconds, when converted to 
> milliseconds is devided by 1_000L rather thant 1_000_000L.
> The test logic of `SlotPoolTest` using `ManualClock` should also be refined.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-06-13 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-12229.
--

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12835) Time conversion is wrong in ManualClock

2019-06-13 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12835:
---
Component/s: (was: Runtime / Coordination)
 Tests

> Time conversion is wrong in ManualClock
> ---
>
> Key: FLINK-12835
> URL: https://issues.apache.org/jira/browse/FLINK-12835
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
> Fix For: 1.9.0
>
>
> `currentTime` stored in ManualClock is nanoseconds, when converted to 
> milliseconds is devided by 1_000L rather thant 1_000_000L.
> The test logic of `SlotPoolTest` using `ManualClock` should also be refined.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12835) Time conversion is wrong in ManualClock

2019-06-13 Thread BoWang (JIRA)
BoWang created FLINK-12835:
--

 Summary: Time conversion is wrong in ManualClock
 Key: FLINK-12835
 URL: https://issues.apache.org/jira/browse/FLINK-12835
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: BoWang
Assignee: BoWang
 Fix For: 1.9.0


`currentTime` stored in ManualClock is nanoseconds, when converted to 
milliseconds is devided by 1_000L rather thant 1_000_000L.

The test logic of `SlotPoolTest` using `ManualClock` should also be refined.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12669) Implement FixedDelayRestartBackoffTimeStrategy

2019-06-11 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860768#comment-16860768
 ] 

BoWang commented on FLINK-12669:


Hi, [~aitozi]. How about the progress of this ticket? If you are busy with 
other tasks, I could share the work pressure.

> Implement FixedDelayRestartBackoffTimeStrategy
> --
>
> Key: FLINK-12669
> URL: https://issues.apache.org/jira/browse/FLINK-12669
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Aitozi
>Priority: Major
> Fix For: 1.9.0
>
>
> We should implement a {{FixedDelayRestartBackoffTimeStrategy}} similar to the 
> {{FixedDelayRestartStrategy}} which allows to configure the number of allowed 
> restarts and the fixed delay in between restart attempts.
> In order to be backwards compatible, we should respect the configuration 
> values used to configure the {{FixedDelayRestartStrategy}} (see the 
> documentation for more information: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html).
>  Additionally, we should also respect the 
> {{FixedDelayRestartStrategyConfiguration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856241#comment-16856241
 ] 

BoWang commented on FLINK-12608:


OK, thanks [~rmetzger].

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Affects Version/s: 1.9.0

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Fix Version/s: 1.9.0

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-05-29 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Description: As discussed in 
[PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], need 
to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.  (was: 
As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.)

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-05-29 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Description: As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.  
(was: As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add getVertex)

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>
> As discussed in 
> [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-05-29 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Description: As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add 

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>
> As discussed in 
> [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
> need to add 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-05-29 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Description: As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add getVertex  (was: As discussed in 
[PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
need to add )

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>
> As discussed in 
> [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] 
> need to add getVertex



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12670) Implement FailureRateRestartBackoffTimeStrategy

2019-05-29 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-12670:
--

Assignee: BoWang

> Implement FailureRateRestartBackoffTimeStrategy
> ---
>
> Key: FLINK-12670
> URL: https://issues.apache.org/jira/browse/FLINK-12670
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
> Fix For: 1.9.0
>
>
> We should implement a {{FailureRateRestartBackoffTimeStrategy}} similar to 
> the {{FailureRateRestartStrategy}} which allows to configure the number of 
> allowed restarts and the fixed delay in between restart attempts.
> In order to be backwards compatible, we should respect the configuration 
> values used to configure the {{FailureRateRestartStrategy}} (see the 
> documentation for more information: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html).
>  Additionally, we should also respect the 
> {{FailureRateRestartStrategyConfiguration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-05-24 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Summary: Add 
getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
 to SchedulingTopology  (was: Add getXXXOrThrow(ExecutionVertexID) to 
SchedulingTopology)

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology

2019-05-24 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Summary: Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology  (was: 
Add getVertexOrThrow(ExecutionVertexID) to SchedulingTopology)

> Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology
> --
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12608) Add getVertexOrThrow(ExecutionVertexID) to SchedulingTopology

2019-05-23 Thread BoWang (JIRA)
BoWang created FLINK-12608:
--

 Summary: Add getVertexOrThrow(ExecutionVertexID) to 
SchedulingTopology
 Key: FLINK-12608
 URL: https://issues.apache.org/jira/browse/FLINK-12608
 Project: Flink
  Issue Type: Sub-task
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11309) Make SpillableSubpartition repeatably read to enable

2019-05-12 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-11309.
--
Resolution: Fixed

> Make SpillableSubpartition repeatably read to enable
> 
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually 
> injected some errors to let the execution fail in different phases. In some 
> cases, the job could recovery from failover and became succeed, but in some 
> cases, the job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.
>  
> Fix Proposal:
>  # *BufferConsumer* should not be removed from buffers until 
> *ExecutionVertex* terminates.
>  # *SpillableSubpartition* should not be released until *ExecutionVertex* 
> terminates.
>  # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
> each of which is corresponding to a *Execution*.
>  Design doc: 
> https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12414) Implement ExecutionGraph to SchedulingTopology Adapter

2019-05-10 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837059#comment-16837059
 ] 

BoWang commented on FLINK-12414:


Hi, [~till.rohrmann] [~gjy]

In the adapter, how to map ResultPartitionState#RELEASED to some state in 
ExecutionGraph state? It seems not easy for ExecutionGraph to know whether 
partition is released since only TM knowns the state of ResultPartition .

> Implement ExecutionGraph to SchedulingTopology Adapter
> --
>
> Key: FLINK-12414
> URL: https://issues.apache.org/jira/browse/FLINK-12414
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>
> Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which 
> adapts the ExecutionGraph to the SchedulingTopology interface.
> *Acceptance criteria*
> * The adapter always reflects an up to date view of the ExecutionGraph state



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12414) Implement ExecutionGraph to SchedulingTopology Adapter

2019-05-08 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-12414:
--

Assignee: BoWang  (was: Gary Yao)

> Implement ExecutionGraph to SchedulingTopology Adapter
> --
>
> Key: FLINK-12414
> URL: https://issues.apache.org/jira/browse/FLINK-12414
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>
> Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which 
> adapts the ExecutionGraph to the SchedulingTopology interface.
> *Acceptance criteria*
> * The adapter always reflects an up to date view of the ExecutionGraph state



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-05-01 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830880#comment-16830880
 ] 

BoWang commented on FLINK-12229:


Thanks [~till.rohrmann].

The advantage of only looking at the input result partitions is obvious. But I 
am wondering there may be some negative effects. 1) Once any result partition 
finishes the consumer vertex will be scheduled and the reset result partition 
infos would be send to the TM separately. That would be a lot of additional 
network communications for updating partition info if the input partition 
number is huge since looking at the IntermediateDataSet all the partition infos 
are composed in the `TaskDeploymentDescriptor`. 2) There may be resource 
deadlock, e.g., considering a job with map-reduce-join job vertices. Both 
reduce and join job vertices has ANY input constraints so parts of the tasks 
could be scheduled but cannot finishes until all the input result partitions 
are ready. When the free resource of the cluster are not enough, the running 
join vertices are waiting for all the input while part of reduce vertices are 
waiting resource to be schedule.

Making SchedulingIntermediateDataSet as part of 
LazyFromSourcesSchedulingStrategy would work, I will do like this.

 

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-04-30 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830129#comment-16830129
 ] 

BoWang edited comment on FLINK-12229 at 4/30/19 9:48 AM:
-

Hi, [~till.rohrmann] [~gjy] [~tiemsn]
 In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the 
IntermediateDataSet is consumable, and IntermediateDataSet is consumable when 
all the result partitions are finished for BLOCKING ResultType. Shall we be 
consistent with this logic in the new scheduler?

Another question is that when I implemented Lazy strategy, I found that each 
time the producer vertex state change or partition consumable notification, all 
the input partitions of the vertex will be checked to decide whether it should 
be scheduled. With n producer vertices and n consumer vertices, the partitions 
would be checked O(n^2) times. I think it is very inefficient. If we add 
SchedulingIntermediateDataSet and react to vertex state change notification, 
relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n 
) partition check times (This is what I did in [GitHub Pull Request 
#8309|https://github.com/apache/flink/pull/8309]). Another option is to 
maintain some member variables in LazyFromSourcesSchedulingStrategy to do the 
same thing as SchedulingIntermediateDataSet.

What do you think?


was (Author: eaglewatcher):
Hi, [~till.rohrmann] [~gjy] [~tiemsn]
In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the 
IntermediateDataSet is consumable, and IntermediateDataSet is consumable when 
all the result partitions are finished for BLOCKING ResultType. Shall we be 
consistent with this logic in the new scheduler?

Another question is that when I implemented Lazy strategy, I found that each 
time the producer vertex state change or partition consumable notification, all 
the input partitions of the vertex will be checked to decide whether it should 
be scheduled. With n producer vertices and n consumer vertices, the partitions 
would be checked O(n^2) times. I think it is very inefficient. If we add 
SchedulingIntermediateDataSet and react to vertex state change notification, 
relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n) 
partition check times (This is what I did in [GitHub Pull Request 
#8309|https://github.com/apache/flink/pull/8309]). Another option is to 
maintain some member variables in LazyFromSourcesSchedulingStrategy to do the 
same thing as SchedulingIntermediateDataSet.

What do you think?

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-04-30 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830129#comment-16830129
 ] 

BoWang commented on FLINK-12229:


Hi, [~till.rohrmann] [~gjy] [~tiemsn]
In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the 
IntermediateDataSet is consumable, and IntermediateDataSet is consumable when 
all the result partitions are finished for BLOCKING ResultType. Shall we be 
consistent with this logic in the new scheduler?

Another question is that when I implemented Lazy strategy, I found that each 
time the producer vertex state change or partition consumable notification, all 
the input partitions of the vertex will be checked to decide whether it should 
be scheduled. With n producer vertices and n consumer vertices, the partitions 
would be checked O(n^2) times. I think it is very inefficient. If we add 
SchedulingIntermediateDataSet and react to vertex state change notification, 
relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n) 
partition check times (This is what I did in [GitHub Pull Request 
#8309|https://github.com/apache/flink/pull/8309]). Another option is to 
maintain some member variables in LazyFromSourcesSchedulingStrategy to do the 
same thing as SchedulingIntermediateDataSet.

What do you think?

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-04-18 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12229:
---
Description: 
Implement a {{SchedulingStrategy}} that covers the functionality of 
{{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
input data are available.

Acceptance Criteria:
 * New strategy is tested in isolation using test implementations (i.e., 
without having to submit a job)

  was:
Implement a {{SchedulingStrategy}} that covers the functionality of 
{{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., all vertices are scheduled at once.

Acceptance Criteria:
* New strategy is tested in isolation using test implementations (i.e., without 
having to submit a job)



> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-04-18 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-12229:
--

Assignee: BoWang  (was: Gary Yao)

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., all vertices are scheduled at once.
> Acceptance Criteria:
> * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2019-04-17 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819998#comment-16819998
 ] 

BoWang commented on FLINK-10644:


Thanks [~StephanEwen] for the comments. We will not approach the scheduler 
related sub-tasks before the redesign work finishes, and only try to implement 
some sub-features irrelevant to scheduler.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12221) Distinguish long tail task based on IOMetrics

2019-04-17 Thread BoWang (JIRA)
BoWang created FLINK-12221:
--

 Summary: Distinguish long tail task based on IOMetrics
 Key: FLINK-12221
 URL: https://issues.apache.org/jira/browse/FLINK-12221
 Project: Flink
  Issue Type: Sub-task
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12220) Speculatively schedule execution for long tail task

2019-04-17 Thread BoWang (JIRA)
BoWang created FLINK-12220:
--

 Summary: Speculatively schedule execution for long tail task
 Key: FLINK-12220
 URL: https://issues.apache.org/jira/browse/FLINK-12220
 Project: Flink
  Issue Type: Sub-task
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-17 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819782#comment-16819782
 ] 

BoWang commented on FLINK-12070:


Thanks [~kevin.cyj] for the evaluation. Some small questions:
 # Shall we add some background disk IO pressure to emulate the real production 
environment since clusters are often shared by many applications?
 # Should the OS disk cache size be carefully tuned?

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12218) Report more record information in IOMetrics for Speculative Schedule

2019-04-17 Thread BoWang (JIRA)
BoWang created FLINK-12218:
--

 Summary: Report more record information in IOMetrics for 
Speculative Schedule
 Key: FLINK-12218
 URL: https://issues.apache.org/jira/browse/FLINK-12218
 Project: Flink
  Issue Type: Sub-task
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11309) Make SpillableSubpartition repeatably read to enable

2019-04-17 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-10644

> Make SpillableSubpartition repeatably read to enable
> 
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually 
> injected some errors to let the execution fail in different phases. In some 
> cases, the job could recovery from failover and became succeed, but in some 
> cases, the job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.
>  
> Fix Proposal:
>  # *BufferConsumer* should not be removed from buffers until 
> *ExecutionVertex* terminates.
>  # *SpillableSubpartition* should not be released until *ExecutionVertex* 
> terminates.
>  # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
> each of which is corresponding to a *Execution*.
>  Design doc: 
> https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution

2019-04-17 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819774#comment-16819774
 ] 

BoWang edited comment on FLINK-10644 at 4/17/19 6:49 AM:
-

Thanks [~greghogan] for the comments.

1) we use several rules to judge the long tail tasks, the process slow ratio is 
one of the concerned terms, the default value of which is 2.0 according to the 
production experience.

2) Currently the shuffle data indeed can not be consumed multiple times, we are 
working on this to fix in FLINK-12070, and all the tasks of the job could be 
speculatively executed.


was (Author: eaglewatcher):
Thanks [~greghogan] for the comments.

1) we use several rules to judge the long tail tasks, and the process slow 
ratio one of the concerned terms, the default value of which is 2.0 according 
to the production experience.

2) Currently the shuffle data indeed can not be consumed multiple times, we are 
working on this to fix in 
[FLINK-12070|https://issues.apache.org/jira/browse/FLINK-12070], and all the 
tasks of the job could be speculatively executed.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10644) Batch Job: Speculative execution

2019-04-17 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-10644:
--

Assignee: BoWang  (was: ryantaocer)

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: BoWang
>Priority: Major
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2019-04-17 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819774#comment-16819774
 ] 

BoWang commented on FLINK-10644:


Thanks [~greghogan] for the comments.

1) we use several rules to judge the long tail tasks, and the process slow 
ratio one of the concerned terms, the default value of which is 2.0 according 
to the production experience.

2) Currently the shuffle data indeed can not be consumed multiple times, we are 
working on this to fix in 
[FLINK-12070|https://issues.apache.org/jira/browse/FLINK-12070], and all the 
tasks of the job could be speculatively executed.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: JIN SUN
>Assignee: ryantaocer
>Priority: Major
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12136) Increase parallelism of tasks with large input data size

2019-04-09 Thread BoWang (JIRA)
BoWang created FLINK-12136:
--

 Summary: Increase parallelism of tasks with large input data size
 Key: FLINK-12136
 URL: https://issues.apache.org/jira/browse/FLINK-12136
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12136) Increase parallelism of tasks with too large input data size

2019-04-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12136:
---
Summary: Increase parallelism of tasks with too large input data size  
(was: Increase parallelism of tasks with large input data size)

> Increase parallelism of tasks with too large input data size
> 
>
> Key: FLINK-12136
> URL: https://issues.apache.org/jira/browse/FLINK-12136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12135) Decrease parallelism of tasks with too small input data size

2019-04-09 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813065#comment-16813065
 ] 

BoWang edited comment on FLINK-12135 at 4/9/19 7:13 AM:


As the first step, we only merge partitions, i.e., only decrease parallelism.


was (Author: eaglewatcher):
Due to the high cost of splitting, in the first step we only merge partitions, 
i.e., only decrease parallelism

> Decrease parallelism of tasks with too small input data size
> 
>
> Key: FLINK-12135
> URL: https://issues.apache.org/jira/browse/FLINK-12135
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12135) Decrease parallelism of tasks with too small input data size

2019-04-09 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813065#comment-16813065
 ] 

BoWang commented on FLINK-12135:


Due to the high cost of splitting, in the first step we only merge partitions, 
i.e., only decrease parallelism

> Decrease parallelism of tasks with too small input data size
> 
>
> Key: FLINK-12135
> URL: https://issues.apache.org/jira/browse/FLINK-12135
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12134) Decrease parallelism of tasks with too small input data size

2019-04-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-12134.
--
Resolution: Duplicate

> Decrease parallelism of tasks with too small input data size
> 
>
> Key: FLINK-12134
> URL: https://issues.apache.org/jira/browse/FLINK-12134
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12135) Decrease parallelism of tasks with too small input data size

2019-04-09 Thread BoWang (JIRA)
BoWang created FLINK-12135:
--

 Summary: Decrease parallelism of tasks with too small input data 
size
 Key: FLINK-12135
 URL: https://issues.apache.org/jira/browse/FLINK-12135
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12134) Decrease parallelism of tasks with too small input data size

2019-04-09 Thread BoWang (JIRA)
BoWang created FLINK-12134:
--

 Summary: Decrease parallelism of tasks with too small input data 
size
 Key: FLINK-12134
 URL: https://issues.apache.org/jira/browse/FLINK-12134
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-03 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-12070:
--

Assignee: BoWang

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11309) Make SpillableSubpartition repeatably read to enable

2019-04-03 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---
Description: 
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 Design doc: 
https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing

  was:
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 


> Make SpillableSubpartition repeatably read to enable
> 
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Operators
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually 
> injected some errors to let the execution fail in different phases. In some 
> cases, the job could recovery from failover and became succeed, but in some 
> cases, the job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is 

[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-02 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808312#comment-16808312
 ] 

BoWang commented on FLINK-12070:


Totally agree with this proposal.

I would like to take this Jira, and the [design 
doc|https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing]
 is ready. Would you mind to have a look at the design? [~till.rohrmann]  

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution

2019-04-01 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806500#comment-16806500
 ] 

BoWang commented on FLINK-12002:


Hi all, design doc is ready, any comments are appreciated: 
https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing

> Adaptive Parallelism of Job Vertex Execution
> 
>
> Key: FLINK-12002
> URL: https://issues.apache.org/jira/browse/FLINK-12002
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: ryantaocer
>Assignee: BoWang
>Priority: Major
>
> In Flink the parallelism of job is a pre-specified parameter, which is 
> usually an empirical value and thus might not be optimal for both performance 
> and resource depending on the amount of data processed in each task.
> Furthermore, a fixed parallelism cannot scale to varying data size common in 
> production cluster where we may not often change configurations. 
> We propose to determine the job parallelism adaptive to the actual total 
> input data size and an ideal data size processed by each task. The ideal size 
> is pre-specified according to the properties of the operator such as the 
> preparation overhead compared with data processing time.
> Our basic idea of "split and merge" is to make the data dispatched evenly 
> acorss Reducers by spliting and/or merging data buckets produced by Map. The 
> data density skew problem is not covered. This kind of parallelism adjustment 
> doesn't have data correctness issue since it doesnt' break the condition that 
> data with the same key is processed by a single task.  We determine the 
> proper parallelism of Reduce during scheduling before its actual running and 
> after its input been ready though not necessary total input data. In such 
> context, apdative parallelism is a better name. This scheduling improvement 
> we think can benefit both batch and stream as long as we can obtain some 
> clues about the input data.
>  Design doc: 
> https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution

2019-04-01 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12002:
---
Description: 
In Flink the parallelism of job is a pre-specified parameter, which is usually 
an empirical value and thus might not be optimal for both performance and 
resource depending on the amount of data processed in each task.

Furthermore, a fixed parallelism cannot scale to varying data size common in 
production cluster where we may not often change configurations. 

We propose to determine the job parallelism adaptive to the actual total input 
data size and an ideal data size processed by each task. The ideal size is 
pre-specified according to the properties of the operator such as the 
preparation overhead compared with data processing time.

Our basic idea of "split and merge" is to make the data dispatched evenly 
acorss Reducers by spliting and/or merging data buckets produced by Map. The 
data density skew problem is not covered. This kind of parallelism adjustment 
doesn't have data correctness issue since it doesnt' break the condition that 
data with the same key is processed by a single task.  We determine the proper 
parallelism of Reduce during scheduling before its actual running and after its 
input been ready though not necessary total input data. In such context, 
apdative parallelism is a better name. This scheduling improvement we think can 
benefit both batch and stream as long as we can obtain some clues about the 
input data.

 Design doc: 
https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing

 

 

  was:
In Flink the parallelism of job is a pre-specified parameter, which is usually 
an empirical value and thus might not be optimal for both performance and 
resource depending on the amount of data processed in each task.

Furthermore, a fixed parallelism cannot scale to varying data size common in 
production cluster where we may not often change configurations. 

We propose to determine the job parallelism adaptive to the actual total input 
data size and an ideal data size processed by each task. The ideal size is 
pre-specified according to the properties of the operator such as the 
preparation overhead compared with data processing time.

Our basic idea of "split and merge" is to make the data dispatched evenly 
acorss Reducers by spliting and/or merging data buckets produced by Map. The 
data density skew problem is not covered. This kind of parallelism adjustment 
doesn't have data correctness issue since it doesnt' break the condition that 
data with the same key is processed by a single task.  We determine the proper 
parallelism of Reduce during scheduling before its actual running and after its 
input been ready though not necessary total input data. In such context, 
apdative parallelism is a better name. This scheduling improvement we think can 
benefit both batch and stream as long as we can obtain some clues about the 
input data.

 

detailed design doc coming soon.

 

 


> Adaptive Parallelism of Job Vertex Execution
> 
>
> Key: FLINK-12002
> URL: https://issues.apache.org/jira/browse/FLINK-12002
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: ryantaocer
>Assignee: BoWang
>Priority: Major
>
> In Flink the parallelism of job is a pre-specified parameter, which is 
> usually an empirical value and thus might not be optimal for both performance 
> and resource depending on the amount of data processed in each task.
> Furthermore, a fixed parallelism cannot scale to varying data size common in 
> production cluster where we may not often change configurations. 
> We propose to determine the job parallelism adaptive to the actual total 
> input data size and an ideal data size processed by each task. The ideal size 
> is pre-specified according to the properties of the operator such as the 
> preparation overhead compared with data processing time.
> Our basic idea of "split and merge" is to make the data dispatched evenly 
> acorss Reducers by spliting and/or merging data buckets produced by Map. The 
> data density skew problem is not covered. This kind of parallelism adjustment 
> doesn't have data correctness issue since it doesnt' break the condition that 
> data with the same key is processed by a single task.  We determine the 
> proper parallelism of Reduce during scheduling before its actual running and 
> after its input been ready though not necessary total input data. In such 
> context, apdative parallelism is a better name. This scheduling improvement 
> we think can benefit both batch and stream as long as we can obtain some 
> clues about the input data.
>  Design doc: 
> 

[jira] [Updated] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution

2019-03-25 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12002:
---
Component/s: Runtime / Operators

> Adaptive Parallelism of Job Vertex Execution
> 
>
> Key: FLINK-12002
> URL: https://issues.apache.org/jira/browse/FLINK-12002
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: ryantaocer
>Assignee: BoWang
>Priority: Major
>
> In Flink the parallelism of job is a pre-specified parameter, which is 
> usually an empirical value and thus might not be optimal for both performance 
> and resource depending on the amount of data processed in each task.
> Furthermore, a fixed parallelism cannot scale to varying data size common in 
> production cluster where we may not often change configurations. 
> We propose to determine the job parallelism adaptive to the actual total 
> input data size and an ideal data size processed by each task. The ideal size 
> is pre-specified according to the properties of the operator such as the 
> preparation overhead compared with data processing time.
> Our basic idea of "split and merge" is to make the data dispatched evenly 
> acorss Reducers by spliting and/or merging data buckets produced by Map. The 
> data density skew problem is not covered. This kind of parallelism adjustment 
> doesn't have data correctness issue since it doesnt' break the condition that 
> data with the same key is processed by a single task.  We determine the 
> proper parallelism of Reduce during scheduling before its actual running and 
> after its input been ready though not necessary total input data. In such 
> context, apdative parallelism is a better name. This scheduling improvement 
> we think can benefit both batch and stream as long as we can obtain some 
> clues about the input data.
>  
> detailed design doc coming soon.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-02-01 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-11375:
--

Assignee: BoWang

> Concurrent modification to slot pool due to SlotSharingManager releaseSlot 
> directly 
> 
>
> Key: FLINK-11375
> URL: https://issues.apache.org/jira/browse/FLINK-11375
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.7.1
>Reporter: shuai.xu
>Assignee: BoWang
>Priority: Major
> Fix For: 1.8.0
>
>
> In SlotPool, the AvailableSlots is lock free, so all access to it should in 
> the main thread of SlotPool, and so all the public methods are called through 
> SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
> This may cause a ConcurrentModificationException.
>  2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
> BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
> SourceConversion(table:[_DataStreamTable_12, source: 
> [BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
> fields:(f0)) -> correlate: 
> table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
> select: 
> item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
>  (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to 
> FINISHED.
>  2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices 
> meet exception, need to fail global execution graph
>  java.lang.reflect.UndeclaredThrowableException
>  at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  at java.lang.Thread.run(Thread.java:834)
>  Caused by: java.util.concurrent.ExecutionException: 
> java.util.ConcurrentModificationException
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
>  ... 23 more
>  Caused by: java.util.ConcurrentModificationException
>  at 

[jira] [Commented] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-02-01 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16758108#comment-16758108
 ] 

BoWang commented on FLINK-11375:


Hi, has anyone fixed this? If not, I would like to take it.

> Concurrent modification to slot pool due to SlotSharingManager releaseSlot 
> directly 
> 
>
> Key: FLINK-11375
> URL: https://issues.apache.org/jira/browse/FLINK-11375
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.7.1
>Reporter: shuai.xu
>Priority: Major
> Fix For: 1.8.0
>
>
> In SlotPool, the AvailableSlots is lock free, so all access to it should in 
> the main thread of SlotPool, and so all the public methods are called through 
> SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
> This may cause a ConcurrentModificationException.
>  2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
> BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
> SourceConversion(table:[_DataStreamTable_12, source: 
> [BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
> fields:(f0)) -> correlate: 
> table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
> select: 
> item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
>  (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to 
> FINISHED.
>  2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices 
> meet exception, need to fail global execution graph
>  java.lang.reflect.UndeclaredThrowableException
>  at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  at java.lang.Thread.run(Thread.java:834)
>  Caused by: java.util.concurrent.ExecutionException: 
> java.util.ConcurrentModificationException
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
>  ... 23 more
>  Caused by: java.util.ConcurrentModificationException
>  at 

[jira] [Closed] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph

2019-01-31 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-11071.
--

> Dynamic proxy classes cannot be resolved when deserializing job graph
> -
>
> Key: FLINK-11071
> URL: https://issues.apache.org/jira/browse/FLINK-11071
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.8.0
>Reporter: Oleg Zhukov
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
> Attachments: SocketWindowWordCount.java
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> It turns impossible to use Java dynamic proxy objects in the job definition 
> (for example, as a MapFunction implementation).
> During deserialization of the job graph, the default implementation of  
> ObjectInputStream.resolveProxyClass(..) is used, which is not using the 
> custom class loader (to look into the submitted jar) and therefore throws 
> ClassNotFoundException.
> Looks like in order to address this, 
> InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom 
> implementation of resolveProxyClass(..) method as well (in addition to 
> resolveClass(..)).
> In order to reproduce the issue, run the attached SocketWindowWordCount Flink 
> app. It's a slight variation of the canonical [SocketWindowWordCount 
>  
> example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html]
>  with a dynamic proxy implementation of the flat map transformation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter

2019-01-22 Thread BoWang (JIRA)
BoWang created FLINK-11411:
--

 Summary: Failover regions number of RestartPipelinedRegionStrategy 
not show in LOG due to incorrect parameter
 Key: FLINK-11411
 URL: https://issues.apache.org/jira/browse/FLINK-11411
 Project: Flink
  Issue Type: Improvement
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769
 ] 

BoWang edited comment on FLINK-11344 at 1/17/19 7:55 AM:
-

[~hailong wang] Thanks for the comment.

The execution attempt list indeed becomes a bit longer if failures occur, but I 
think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.


was (Author: eaglewatcher):
[~hailong wang] Thanks for the comment.

The execution attempt list will indeed becomes a bit longer if failures occur, 
but I think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769
 ] 

BoWang commented on FLINK-11344:


[~hailong wang] Thanks for the comment.

The execution attempt list will indeed becomes a bit longer if failures occur, 
but I think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11344:
---
Summary: Display All Execution Attempt Information on Flink Web Dashboard  
(was: Show All Execution Attempt Information on Flink Web Dashboard)

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11344) Show All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)
BoWang created FLINK-11344:
--

 Summary: Show All Execution Attempt Information on Flink Web 
Dashboard
 Key: FLINK-11344
 URL: https://issues.apache.org/jira/browse/FLINK-11344
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: BoWang
Assignee: BoWang


Currently, only one Execution Attempt of each sub-task is shown in web 
dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes

2019-01-14 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---
Description: 
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 

  was:
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 


> Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes 
> -
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually 
> injected some errors to let the execution fail in different phases. In some 
> cases, the job could recovery from failover and became succeed, but in some 
> cases, the job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.

[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes

2019-01-12 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---
Description: 
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 

  was:
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 


> Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes 
> -
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected 
> some errors to let the execution fail in different phases. In some cases, the 
> job could recovery from failover and became succeed, but in some cases, the 
> job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.
>  

[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes

2019-01-12 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---
Description: 
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 

  was:
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionView*s, 
each of which is corresponding to a *Execution*.

 


> Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes 
> -
>
> Key: FLINK-11309
> URL: https://issues.apache.org/jira/browse/FLINK-11309
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.7.1
>Reporter: BoWang
>Assignee: BoWang
>Priority: Critical
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected 
> some errors to let the execution fail in different phases. In some cases, the 
> job could recovery from failover and became succeed, but in some cases, the 
> job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.
>  

[jira] [Created] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes

2019-01-12 Thread BoWang (JIRA)
BoWang created FLINK-11309:
--

 Summary: Batch Job Failover Using RestartPipelinedRegionStrategy 
Fails in Some Scenes 
 Key: FLINK-11309
 URL: https://issues.apache.org/jira/browse/FLINK-11309
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.7.1, 1.7.0, 1.6.2
Reporter: BoWang
Assignee: BoWang


Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionView*s, 
each of which is corresponding to a *Execution*.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)
BoWang created FLINK-11295:
--

 Summary: Rename configuration options of queryable state from 
query.x to queryable-state.x
 Key: FLINK-11295
 URL: https://issues.apache.org/jira/browse/FLINK-11295
 Project: Flink
  Issue Type: Improvement
Reporter: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11295:
---
Component/s: Configuration

> Rename configuration options of queryable state from query.x to 
> queryable-state.x
> -
>
> Key: FLINK-11295
> URL: https://issues.apache.org/jira/browse/FLINK-11295
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-11295:
--

Assignee: BoWang

> Rename configuration options of queryable state from query.x to 
> queryable-state.x
> -
>
> Key: FLINK-11295
> URL: https://issues.apache.org/jira/browse/FLINK-11295
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph

2019-01-08 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736888#comment-16736888
 ] 

BoWang commented on FLINK-11071:


Hi, has this problem been fixed yet? If not, I would like to fix this.

> Dynamic proxy classes cannot be resolved when deserializing job graph
> -
>
> Key: FLINK-11071
> URL: https://issues.apache.org/jira/browse/FLINK-11071
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.8.0
>Reporter: Oleg Zhukov
>Priority: Major
> Attachments: SocketWindowWordCount.java
>
>
> It turns impossible to use Java dynamic proxy objects in the job definition 
> (for example, as a MapFunction implementation).
> During deserialization of the job graph, the default implementation of  
> ObjectInputStream.resolveProxyClass(..) is used, which is not using the 
> custom class loader (to look into the submitted jar) and therefore throws 
> ClassNotFoundException.
> Looks like in order to address this, 
> InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom 
> implementation of resolveProxyClass(..) method as well (in addition to 
> resolveClass(..)).
> In order to reproduce the issue, run the attached SocketWindowWordCount Flink 
> app. It's a slight variation of the canonical [SocketWindowWordCount 
>  
> example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html]
>  with a dynamic proxy implementation of the flat map transformation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph

2019-01-08 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-11071:
--

Assignee: BoWang

> Dynamic proxy classes cannot be resolved when deserializing job graph
> -
>
> Key: FLINK-11071
> URL: https://issues.apache.org/jira/browse/FLINK-11071
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.8.0
>Reporter: Oleg Zhukov
>Assignee: BoWang
>Priority: Major
> Attachments: SocketWindowWordCount.java
>
>
> It turns impossible to use Java dynamic proxy objects in the job definition 
> (for example, as a MapFunction implementation).
> During deserialization of the job graph, the default implementation of  
> ObjectInputStream.resolveProxyClass(..) is used, which is not using the 
> custom class loader (to look into the submitted jar) and therefore throws 
> ClassNotFoundException.
> Looks like in order to address this, 
> InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom 
> implementation of resolveProxyClass(..) method as well (in addition to 
> resolveClass(..)).
> In order to reproduce the issue, run the attached SocketWindowWordCount Flink 
> app. It's a slight variation of the canonical [SocketWindowWordCount 
>  
> example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html]
>  with a dynamic proxy implementation of the flat map transformation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11281) Two test in StateBackendMigrationTestBase failes

2019-01-07 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-11281.
--
Resolution: Duplicate

> Two test in StateBackendMigrationTestBase failes
> 
>
> Key: FLINK-11281
> URL: https://issues.apache.org/jira/browse/FLINK-11281
> Project: Flink
>  Issue Type: Bug
>Reporter: BoWang
>Priority: Blocker
> Attachments: fail.png
>
>
> Hi, all.
> I found Flink builds got two stable failures in StateBackendMigrationTestBase 
> of recent PRs, which also occurs in my development environment. The detailed 
> errors are as follows.
> This currently doesn't pass because the ListSerializer doesn't respect the 
> reconfigured case, yet.
> java.lang.UnsupportedOperationException: The serializer should have been 
> reconfigured as a new instance; shouldn't be used.
> at 
> org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160)
>  at 
> org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383)
>  at 
> org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401)
>  at 
> org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11281) Two test in StateBackendMigrationTestBase failes

2019-01-07 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736815#comment-16736815
 ] 

BoWang commented on FLINK-11281:


Ok, this is a duplicated issue, I will close it.

> Two test in StateBackendMigrationTestBase failes
> 
>
> Key: FLINK-11281
> URL: https://issues.apache.org/jira/browse/FLINK-11281
> Project: Flink
>  Issue Type: Bug
>Reporter: BoWang
>Priority: Blocker
> Attachments: fail.png
>
>
> Hi, all.
> I found Flink builds got two stable failures in StateBackendMigrationTestBase 
> of recent PRs, which also occurs in my development environment. The detailed 
> errors are as follows.
> This currently doesn't pass because the ListSerializer doesn't respect the 
> reconfigured case, yet.
> java.lang.UnsupportedOperationException: The serializer should have been 
> reconfigured as a new instance; shouldn't be used.
> at 
> org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160)
>  at 
> org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383)
>  at 
> org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401)
>  at 
> org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11281) Two test in StateBackendMigrationTestBase failes

2019-01-07 Thread BoWang (JIRA)
BoWang created FLINK-11281:
--

 Summary: Two test in StateBackendMigrationTestBase failes
 Key: FLINK-11281
 URL: https://issues.apache.org/jira/browse/FLINK-11281
 Project: Flink
  Issue Type: Bug
Reporter: BoWang
 Attachments: fail.png

Hi, all.

I found Flink builds got two stable failures in StateBackendMigrationTestBase 
of recent PRs, which also occurs in my development environment. The detailed 
errors are as follows.

This currently doesn't pass because the ListSerializer doesn't respect the 
reconfigured case, yet.

java.lang.UnsupportedOperationException: The serializer should have been 
reconfigured as a new instance; shouldn't be used.

at 
org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160)
 at 
org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154)
 at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159)
 at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96)
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383)
 at 
org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401)
 at 
org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10866) Queryable state can prevent cluster from starting

2019-01-03 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-10866:
--

Assignee: BoWang

> Queryable state can prevent cluster from starting
> -
>
> Key: FLINK-10866
> URL: https://issues.apache.org/jira/browse/FLINK-10866
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Critical
> Fix For: 1.8.0
>
>
> The {{KvStateServerImpl}} can currently prevent the {{TaskExecutor}} from 
> starting. 
> Currently, the QS server starts per default on port {{9067}}. If this port is 
> not free, then it fails and stops the whole initialization of the 
> {{TaskExecutor}}. I think the QS server should not stop the {{TaskExecutor}} 
> from starting.
> We should at least change the default port to {{0}} to avoid port conflicts. 
> However, this will break all setups which don't explicitly set the QS port 
> because now it either needs to be setup or extracted from the logs.
> Additionally, we should think about whether a QS server startup failure 
> should lead to a {{TaskExecutor}} failure or simply be logged. Both 
> approaches have pros and cons. Currently, a failing QS server will also 
> affect users which don't want to use QS. If we tolerate failures in the QS 
> server, then a user who wants to use QS might run into problems with state 
> not being reachable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10866) Queryable state can prevent cluster from starting

2019-01-03 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16733902#comment-16733902
 ] 

BoWang commented on FLINK-10866:


hi, has anyone taken this jira? If not, I would like to take this.

> Queryable state can prevent cluster from starting
> -
>
> Key: FLINK-10866
> URL: https://issues.apache.org/jira/browse/FLINK-10866
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> The {{KvStateServerImpl}} can currently prevent the {{TaskExecutor}} from 
> starting. 
> Currently, the QS server starts per default on port {{9067}}. If this port is 
> not free, then it fails and stops the whole initialization of the 
> {{TaskExecutor}}. I think the QS server should not stop the {{TaskExecutor}} 
> from starting.
> We should at least change the default port to {{0}} to avoid port conflicts. 
> However, this will break all setups which don't explicitly set the QS port 
> because now it either needs to be setup or extracted from the logs.
> Additionally, we should think about whether a QS server startup failure 
> should lead to a {{TaskExecutor}} failure or simply be logged. Both 
> approaches have pros and cons. Currently, a failing QS server will also 
> affect users which don't want to use QS. If we tolerate failures in the QS 
> server, then a user who wants to use QS might run into problems with state 
> not being reachable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11232) Empty Start Time of sub-task on web dashboard

2018-12-29 Thread BoWang (JIRA)
BoWang created FLINK-11232:
--

 Summary: Empty Start Time of sub-task on web dashboard
 Key: FLINK-11232
 URL: https://issues.apache.org/jira/browse/FLINK-11232
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.7.1, 1.7.0, 1.6.3, 1.6.2, 1.5.5
Reporter: BoWang
Assignee: BoWang
 Attachments: empty.png





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)