[jira] [Closed] (FLINK-12835) Time conversion is wrong in ManualClock
[ https://issues.apache.org/jira/browse/FLINK-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-12835. -- > Time conversion is wrong in ManualClock > --- > > Key: FLINK-12835 > URL: https://issues.apache.org/jira/browse/FLINK-12835 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > `currentTime` stored in ManualClock is nanoseconds, when converted to > milliseconds is devided by 1_000L rather thant 1_000_000L. > The test logic of `SlotPoolTest` using `ManualClock` should also be refined. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-12229. -- > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12835) Time conversion is wrong in ManualClock
[ https://issues.apache.org/jira/browse/FLINK-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12835: --- Component/s: (was: Runtime / Coordination) Tests > Time conversion is wrong in ManualClock > --- > > Key: FLINK-12835 > URL: https://issues.apache.org/jira/browse/FLINK-12835 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Fix For: 1.9.0 > > > `currentTime` stored in ManualClock is nanoseconds, when converted to > milliseconds is devided by 1_000L rather thant 1_000_000L. > The test logic of `SlotPoolTest` using `ManualClock` should also be refined. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12835) Time conversion is wrong in ManualClock
BoWang created FLINK-12835: -- Summary: Time conversion is wrong in ManualClock Key: FLINK-12835 URL: https://issues.apache.org/jira/browse/FLINK-12835 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.9.0 Reporter: BoWang Assignee: BoWang Fix For: 1.9.0 `currentTime` stored in ManualClock is nanoseconds, when converted to milliseconds is devided by 1_000L rather thant 1_000_000L. The test logic of `SlotPoolTest` using `ManualClock` should also be refined. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12669) Implement FixedDelayRestartBackoffTimeStrategy
[ https://issues.apache.org/jira/browse/FLINK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860768#comment-16860768 ] BoWang commented on FLINK-12669: Hi, [~aitozi]. How about the progress of this ticket? If you are busy with other tasks, I could share the work pressure. > Implement FixedDelayRestartBackoffTimeStrategy > -- > > Key: FLINK-12669 > URL: https://issues.apache.org/jira/browse/FLINK-12669 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Aitozi >Priority: Major > Fix For: 1.9.0 > > > We should implement a {{FixedDelayRestartBackoffTimeStrategy}} similar to the > {{FixedDelayRestartStrategy}} which allows to configure the number of allowed > restarts and the fixed delay in between restart attempts. > In order to be backwards compatible, we should respect the configuration > values used to configure the {{FixedDelayRestartStrategy}} (see the > documentation for more information: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html). > Additionally, we should also respect the > {{FixedDelayRestartStrategyConfiguration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856241#comment-16856241 ] BoWang commented on FLINK-12608: OK, thanks [~rmetzger]. > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Affects Version/s: 1.9.0 > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Fix Version/s: 1.9.0 > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Description: As discussed in [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. (was: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.) > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Description: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. (was: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add getVertex) > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > > As discussed in > [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Description: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > > As discussed in > [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] > need to add -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Description: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add getVertex (was: As discussed in [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] need to add ) > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > > As discussed in > [PR#8309|[https://github.com/apache/flink/pull/8309#discussion_r287190944],] > need to add getVertex -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12670) Implement FailureRateRestartBackoffTimeStrategy
[ https://issues.apache.org/jira/browse/FLINK-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-12670: -- Assignee: BoWang > Implement FailureRateRestartBackoffTimeStrategy > --- > > Key: FLINK-12670 > URL: https://issues.apache.org/jira/browse/FLINK-12670 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > Fix For: 1.9.0 > > > We should implement a {{FailureRateRestartBackoffTimeStrategy}} similar to > the {{FailureRateRestartStrategy}} which allows to configure the number of > allowed restarts and the fixed delay in between restart attempts. > In order to be backwards compatible, we should respect the configuration > values used to configure the {{FailureRateRestartStrategy}} (see the > documentation for more information: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html). > Additionally, we should also respect the > {{FailureRateRestartStrategyConfiguration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Summary: Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology (was: Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology) > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Summary: Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology (was: Add getVertexOrThrow(ExecutionVertexID) to SchedulingTopology) > Add getXXXOrThrow(ExecutionVertexID) to SchedulingTopology > -- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task >Reporter: BoWang >Assignee: BoWang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12608) Add getVertexOrThrow(ExecutionVertexID) to SchedulingTopology
BoWang created FLINK-12608: -- Summary: Add getVertexOrThrow(ExecutionVertexID) to SchedulingTopology Key: FLINK-12608 URL: https://issues.apache.org/jira/browse/FLINK-12608 Project: Flink Issue Type: Sub-task Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11309) Make SpillableSubpartition repeatably read to enable
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-11309. -- Resolution: Fixed > Make SpillableSubpartition repeatably read to enable > > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > Labels: pull-request-available > Time Spent: 1h > Remaining Estimate: 0h > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually > injected some errors to let the execution fail in different phases. In some > cases, the job could recovery from failover and became succeed, but in some > cases, the job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, > when failures occurred after having read data, some *BufferConsumers* have > already released, although tasks retried, the input data is incomplete. > > Fix Proposal: > # *BufferConsumer* should not be removed from buffers until > *ExecutionVertex* terminates. > # *SpillableSubpartition* should not be released until *ExecutionVertex* > terminates. > # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, > each of which is corresponding to a *Execution*. > Design doc: > https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12414) Implement ExecutionGraph to SchedulingTopology Adapter
[ https://issues.apache.org/jira/browse/FLINK-12414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837059#comment-16837059 ] BoWang commented on FLINK-12414: Hi, [~till.rohrmann] [~gjy] In the adapter, how to map ResultPartitionState#RELEASED to some state in ExecutionGraph state? It seems not easy for ExecutionGraph to know whether partition is released since only TM knowns the state of ResultPartition . > Implement ExecutionGraph to SchedulingTopology Adapter > -- > > Key: FLINK-12414 > URL: https://issues.apache.org/jira/browse/FLINK-12414 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > > Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which > adapts the ExecutionGraph to the SchedulingTopology interface. > *Acceptance criteria* > * The adapter always reflects an up to date view of the ExecutionGraph state -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12414) Implement ExecutionGraph to SchedulingTopology Adapter
[ https://issues.apache.org/jira/browse/FLINK-12414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-12414: -- Assignee: BoWang (was: Gary Yao) > Implement ExecutionGraph to SchedulingTopology Adapter > -- > > Key: FLINK-12414 > URL: https://issues.apache.org/jira/browse/FLINK-12414 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > > Implement an [adapter|https://en.wikipedia.org/wiki/Adapter_pattern], which > adapts the ExecutionGraph to the SchedulingTopology interface. > *Acceptance criteria* > * The adapter always reflects an up to date view of the ExecutionGraph state -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830880#comment-16830880 ] BoWang commented on FLINK-12229: Thanks [~till.rohrmann]. The advantage of only looking at the input result partitions is obvious. But I am wondering there may be some negative effects. 1) Once any result partition finishes the consumer vertex will be scheduled and the reset result partition infos would be send to the TM separately. That would be a lot of additional network communications for updating partition info if the input partition number is huge since looking at the IntermediateDataSet all the partition infos are composed in the `TaskDeploymentDescriptor`. 2) There may be resource deadlock, e.g., considering a job with map-reduce-join job vertices. Both reduce and join job vertices has ANY input constraints so parts of the tasks could be scheduled but cannot finishes until all the input result partitions are ready. When the free resource of the cluster are not enough, the running join vertices are waiting for all the input while part of reduce vertices are waiting resource to be schedule. Making SchedulingIntermediateDataSet as part of LazyFromSourcesSchedulingStrategy would work, I will do like this. > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830129#comment-16830129 ] BoWang edited comment on FLINK-12229 at 4/30/19 9:48 AM: - Hi, [~till.rohrmann] [~gjy] [~tiemsn] In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the IntermediateDataSet is consumable, and IntermediateDataSet is consumable when all the result partitions are finished for BLOCKING ResultType. Shall we be consistent with this logic in the new scheduler? Another question is that when I implemented Lazy strategy, I found that each time the producer vertex state change or partition consumable notification, all the input partitions of the vertex will be checked to decide whether it should be scheduled. With n producer vertices and n consumer vertices, the partitions would be checked O(n^2) times. I think it is very inefficient. If we add SchedulingIntermediateDataSet and react to vertex state change notification, relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n ) partition check times (This is what I did in [GitHub Pull Request #8309|https://github.com/apache/flink/pull/8309]). Another option is to maintain some member variables in LazyFromSourcesSchedulingStrategy to do the same thing as SchedulingIntermediateDataSet. What do you think? was (Author: eaglewatcher): Hi, [~till.rohrmann] [~gjy] [~tiemsn] In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the IntermediateDataSet is consumable, and IntermediateDataSet is consumable when all the result partitions are finished for BLOCKING ResultType. Shall we be consistent with this logic in the new scheduler? Another question is that when I implemented Lazy strategy, I found that each time the producer vertex state change or partition consumable notification, all the input partitions of the vertex will be checked to decide whether it should be scheduled. With n producer vertices and n consumer vertices, the partitions would be checked O(n^2) times. I think it is very inefficient. If we add SchedulingIntermediateDataSet and react to vertex state change notification, relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n) partition check times (This is what I did in [GitHub Pull Request #8309|https://github.com/apache/flink/pull/8309]). Another option is to maintain some member variables in LazyFromSourcesSchedulingStrategy to do the same thing as SchedulingIntermediateDataSet. What do you think? > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830129#comment-16830129 ] BoWang commented on FLINK-12229: Hi, [~till.rohrmann] [~gjy] [~tiemsn] In the origin scheduler, the consumer vertex is scheduled when ANY/ALL the IntermediateDataSet is consumable, and IntermediateDataSet is consumable when all the result partitions are finished for BLOCKING ResultType. Shall we be consistent with this logic in the new scheduler? Another question is that when I implemented Lazy strategy, I found that each time the producer vertex state change or partition consumable notification, all the input partitions of the vertex will be checked to decide whether it should be scheduled. With n producer vertices and n consumer vertices, the partitions would be checked O(n^2) times. I think it is very inefficient. If we add SchedulingIntermediateDataSet and react to vertex state change notification, relying on the counter of the SchedulingIntermediateDataSet, it needs only O(n) partition check times (This is what I did in [GitHub Pull Request #8309|https://github.com/apache/flink/pull/8309]). Another option is to maintain some member variables in LazyFromSourcesSchedulingStrategy to do the same thing as SchedulingIntermediateDataSet. What do you think? > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12229: --- Description: Implement a {{SchedulingStrategy}} that covers the functionality of {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the input data are available. Acceptance Criteria: * New strategy is tested in isolation using test implementations (i.e., without having to submit a job) was: Implement a {{SchedulingStrategy}} that covers the functionality of {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., all vertices are scheduled at once. Acceptance Criteria: * New strategy is tested in isolation using test implementations (i.e., without having to submit a job) > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-12229: -- Assignee: BoWang (was: Gary Yao) > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., all vertices are scheduled at once. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution
[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819998#comment-16819998 ] BoWang commented on FLINK-10644: Thanks [~StephanEwen] for the comments. We will not approach the scheduler related sub-tasks before the redesign work finishes, and only try to implement some sub-features irrelevant to scheduler. > Batch Job: Speculative execution > > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: JIN SUN >Assignee: BoWang >Priority: Major > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12221) Distinguish long tail task based on IOMetrics
BoWang created FLINK-12221: -- Summary: Distinguish long tail task based on IOMetrics Key: FLINK-12221 URL: https://issues.apache.org/jira/browse/FLINK-12221 Project: Flink Issue Type: Sub-task Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12220) Speculatively schedule execution for long tail task
BoWang created FLINK-12220: -- Summary: Speculatively schedule execution for long tail task Key: FLINK-12220 URL: https://issues.apache.org/jira/browse/FLINK-12220 Project: Flink Issue Type: Sub-task Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819782#comment-16819782 ] BoWang commented on FLINK-12070: Thanks [~kevin.cyj] for the evaluation. Some small questions: # Shall we add some background disk IO pressure to emulate the real production environment since clusters are often shared by many applications? # Should the OS disk cache size be carefully tuned? > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12218) Report more record information in IOMetrics for Speculative Schedule
BoWang created FLINK-12218: -- Summary: Report more record information in IOMetrics for Speculative Schedule Key: FLINK-12218 URL: https://issues.apache.org/jira/browse/FLINK-12218 Project: Flink Issue Type: Sub-task Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11309) Make SpillableSubpartition repeatably read to enable
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11309: --- Issue Type: Sub-task (was: Bug) Parent: FLINK-10644 > Make SpillableSubpartition repeatably read to enable > > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > Labels: pull-request-available > Time Spent: 50m > Remaining Estimate: 0h > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually > injected some errors to let the execution fail in different phases. In some > cases, the job could recovery from failover and became succeed, but in some > cases, the job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, > when failures occurred after having read data, some *BufferConsumers* have > already released, although tasks retried, the input data is incomplete. > > Fix Proposal: > # *BufferConsumer* should not be removed from buffers until > *ExecutionVertex* terminates. > # *SpillableSubpartition* should not be released until *ExecutionVertex* > terminates. > # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, > each of which is corresponding to a *Execution*. > Design doc: > https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10644) Batch Job: Speculative execution
[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819774#comment-16819774 ] BoWang edited comment on FLINK-10644 at 4/17/19 6:49 AM: - Thanks [~greghogan] for the comments. 1) we use several rules to judge the long tail tasks, the process slow ratio is one of the concerned terms, the default value of which is 2.0 according to the production experience. 2) Currently the shuffle data indeed can not be consumed multiple times, we are working on this to fix in FLINK-12070, and all the tasks of the job could be speculatively executed. was (Author: eaglewatcher): Thanks [~greghogan] for the comments. 1) we use several rules to judge the long tail tasks, and the process slow ratio one of the concerned terms, the default value of which is 2.0 according to the production experience. 2) Currently the shuffle data indeed can not be consumed multiple times, we are working on this to fix in [FLINK-12070|https://issues.apache.org/jira/browse/FLINK-12070], and all the tasks of the job could be speculatively executed. > Batch Job: Speculative execution > > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: JIN SUN >Assignee: BoWang >Priority: Major > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10644) Batch Job: Speculative execution
[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-10644: -- Assignee: BoWang (was: ryantaocer) > Batch Job: Speculative execution > > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: JIN SUN >Assignee: BoWang >Priority: Major > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution
[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819774#comment-16819774 ] BoWang commented on FLINK-10644: Thanks [~greghogan] for the comments. 1) we use several rules to judge the long tail tasks, and the process slow ratio one of the concerned terms, the default value of which is 2.0 according to the production experience. 2) Currently the shuffle data indeed can not be consumed multiple times, we are working on this to fix in [FLINK-12070|https://issues.apache.org/jira/browse/FLINK-12070], and all the tasks of the job could be speculatively executed. > Batch Job: Speculative execution > > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: JIN SUN >Assignee: ryantaocer >Priority: Major > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12136) Increase parallelism of tasks with large input data size
BoWang created FLINK-12136: -- Summary: Increase parallelism of tasks with large input data size Key: FLINK-12136 URL: https://issues.apache.org/jira/browse/FLINK-12136 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12136) Increase parallelism of tasks with too large input data size
[ https://issues.apache.org/jira/browse/FLINK-12136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12136: --- Summary: Increase parallelism of tasks with too large input data size (was: Increase parallelism of tasks with large input data size) > Increase parallelism of tasks with too large input data size > > > Key: FLINK-12136 > URL: https://issues.apache.org/jira/browse/FLINK-12136 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: BoWang >Assignee: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12135) Decrease parallelism of tasks with too small input data size
[ https://issues.apache.org/jira/browse/FLINK-12135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813065#comment-16813065 ] BoWang edited comment on FLINK-12135 at 4/9/19 7:13 AM: As the first step, we only merge partitions, i.e., only decrease parallelism. was (Author: eaglewatcher): Due to the high cost of splitting, in the first step we only merge partitions, i.e., only decrease parallelism > Decrease parallelism of tasks with too small input data size > > > Key: FLINK-12135 > URL: https://issues.apache.org/jira/browse/FLINK-12135 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: BoWang >Assignee: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12135) Decrease parallelism of tasks with too small input data size
[ https://issues.apache.org/jira/browse/FLINK-12135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813065#comment-16813065 ] BoWang commented on FLINK-12135: Due to the high cost of splitting, in the first step we only merge partitions, i.e., only decrease parallelism > Decrease parallelism of tasks with too small input data size > > > Key: FLINK-12135 > URL: https://issues.apache.org/jira/browse/FLINK-12135 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: BoWang >Assignee: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12134) Decrease parallelism of tasks with too small input data size
[ https://issues.apache.org/jira/browse/FLINK-12134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-12134. -- Resolution: Duplicate > Decrease parallelism of tasks with too small input data size > > > Key: FLINK-12134 > URL: https://issues.apache.org/jira/browse/FLINK-12134 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: BoWang >Assignee: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12135) Decrease parallelism of tasks with too small input data size
BoWang created FLINK-12135: -- Summary: Decrease parallelism of tasks with too small input data size Key: FLINK-12135 URL: https://issues.apache.org/jira/browse/FLINK-12135 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12134) Decrease parallelism of tasks with too small input data size
BoWang created FLINK-12134: -- Summary: Decrease parallelism of tasks with too small input data size Key: FLINK-12134 URL: https://issues.apache.org/jira/browse/FLINK-12134 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-12070: -- Assignee: BoWang > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11309) Make SpillableSubpartition repeatably read to enable
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11309: --- Description: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. Design doc: https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing was: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. > Make SpillableSubpartition repeatably read to enable > > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > Labels: pull-request-available > Time Spent: 50m > Remaining Estimate: 0h > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually > injected some errors to let the execution fail in different phases. In some > cases, the job could recovery from failover and became succeed, but in some > cases, the job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808312#comment-16808312 ] BoWang commented on FLINK-12070: Totally agree with this proposal. I would like to take this Jira, and the [design doc|https://docs.google.com/document/d/1uXuJFiKODf241CKci3b0JnaF3zQ-Wt0V9wmC7kYwX-M/edit?usp=sharing] is ready. Would you mind to have a look at the design? [~till.rohrmann] > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution
[ https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806500#comment-16806500 ] BoWang commented on FLINK-12002: Hi all, design doc is ready, any comments are appreciated: https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing > Adaptive Parallelism of Job Vertex Execution > > > Key: FLINK-12002 > URL: https://issues.apache.org/jira/browse/FLINK-12002 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: ryantaocer >Assignee: BoWang >Priority: Major > > In Flink the parallelism of job is a pre-specified parameter, which is > usually an empirical value and thus might not be optimal for both performance > and resource depending on the amount of data processed in each task. > Furthermore, a fixed parallelism cannot scale to varying data size common in > production cluster where we may not often change configurations. > We propose to determine the job parallelism adaptive to the actual total > input data size and an ideal data size processed by each task. The ideal size > is pre-specified according to the properties of the operator such as the > preparation overhead compared with data processing time. > Our basic idea of "split and merge" is to make the data dispatched evenly > acorss Reducers by spliting and/or merging data buckets produced by Map. The > data density skew problem is not covered. This kind of parallelism adjustment > doesn't have data correctness issue since it doesnt' break the condition that > data with the same key is processed by a single task. We determine the > proper parallelism of Reduce during scheduling before its actual running and > after its input been ready though not necessary total input data. In such > context, apdative parallelism is a better name. This scheduling improvement > we think can benefit both batch and stream as long as we can obtain some > clues about the input data. > Design doc: > https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution
[ https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12002: --- Description: In Flink the parallelism of job is a pre-specified parameter, which is usually an empirical value and thus might not be optimal for both performance and resource depending on the amount of data processed in each task. Furthermore, a fixed parallelism cannot scale to varying data size common in production cluster where we may not often change configurations. We propose to determine the job parallelism adaptive to the actual total input data size and an ideal data size processed by each task. The ideal size is pre-specified according to the properties of the operator such as the preparation overhead compared with data processing time. Our basic idea of "split and merge" is to make the data dispatched evenly acorss Reducers by spliting and/or merging data buckets produced by Map. The data density skew problem is not covered. This kind of parallelism adjustment doesn't have data correctness issue since it doesnt' break the condition that data with the same key is processed by a single task. We determine the proper parallelism of Reduce during scheduling before its actual running and after its input been ready though not necessary total input data. In such context, apdative parallelism is a better name. This scheduling improvement we think can benefit both batch and stream as long as we can obtain some clues about the input data. Design doc: https://docs.google.com/document/d/1ZxnoJ3SOxUk1PL2xC1t-kepq28Pg20IL6eVKUUOWSKY/edit?usp=sharing was: In Flink the parallelism of job is a pre-specified parameter, which is usually an empirical value and thus might not be optimal for both performance and resource depending on the amount of data processed in each task. Furthermore, a fixed parallelism cannot scale to varying data size common in production cluster where we may not often change configurations. We propose to determine the job parallelism adaptive to the actual total input data size and an ideal data size processed by each task. The ideal size is pre-specified according to the properties of the operator such as the preparation overhead compared with data processing time. Our basic idea of "split and merge" is to make the data dispatched evenly acorss Reducers by spliting and/or merging data buckets produced by Map. The data density skew problem is not covered. This kind of parallelism adjustment doesn't have data correctness issue since it doesnt' break the condition that data with the same key is processed by a single task. We determine the proper parallelism of Reduce during scheduling before its actual running and after its input been ready though not necessary total input data. In such context, apdative parallelism is a better name. This scheduling improvement we think can benefit both batch and stream as long as we can obtain some clues about the input data. detailed design doc coming soon. > Adaptive Parallelism of Job Vertex Execution > > > Key: FLINK-12002 > URL: https://issues.apache.org/jira/browse/FLINK-12002 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: ryantaocer >Assignee: BoWang >Priority: Major > > In Flink the parallelism of job is a pre-specified parameter, which is > usually an empirical value and thus might not be optimal for both performance > and resource depending on the amount of data processed in each task. > Furthermore, a fixed parallelism cannot scale to varying data size common in > production cluster where we may not often change configurations. > We propose to determine the job parallelism adaptive to the actual total > input data size and an ideal data size processed by each task. The ideal size > is pre-specified according to the properties of the operator such as the > preparation overhead compared with data processing time. > Our basic idea of "split and merge" is to make the data dispatched evenly > acorss Reducers by spliting and/or merging data buckets produced by Map. The > data density skew problem is not covered. This kind of parallelism adjustment > doesn't have data correctness issue since it doesnt' break the condition that > data with the same key is processed by a single task. We determine the > proper parallelism of Reduce during scheduling before its actual running and > after its input been ready though not necessary total input data. In such > context, apdative parallelism is a better name. This scheduling improvement > we think can benefit both batch and stream as long as we can obtain some > clues about the input data. > Design doc: >
[jira] [Updated] (FLINK-12002) Adaptive Parallelism of Job Vertex Execution
[ https://issues.apache.org/jira/browse/FLINK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12002: --- Component/s: Runtime / Operators > Adaptive Parallelism of Job Vertex Execution > > > Key: FLINK-12002 > URL: https://issues.apache.org/jira/browse/FLINK-12002 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: ryantaocer >Assignee: BoWang >Priority: Major > > In Flink the parallelism of job is a pre-specified parameter, which is > usually an empirical value and thus might not be optimal for both performance > and resource depending on the amount of data processed in each task. > Furthermore, a fixed parallelism cannot scale to varying data size common in > production cluster where we may not often change configurations. > We propose to determine the job parallelism adaptive to the actual total > input data size and an ideal data size processed by each task. The ideal size > is pre-specified according to the properties of the operator such as the > preparation overhead compared with data processing time. > Our basic idea of "split and merge" is to make the data dispatched evenly > acorss Reducers by spliting and/or merging data buckets produced by Map. The > data density skew problem is not covered. This kind of parallelism adjustment > doesn't have data correctness issue since it doesnt' break the condition that > data with the same key is processed by a single task. We determine the > proper parallelism of Reduce during scheduling before its actual running and > after its input been ready though not necessary total input data. In such > context, apdative parallelism is a better name. This scheduling improvement > we think can benefit both batch and stream as long as we can obtain some > clues about the input data. > > detailed design doc coming soon. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-11375: -- Assignee: BoWang > Concurrent modification to slot pool due to SlotSharingManager releaseSlot > directly > > > Key: FLINK-11375 > URL: https://issues.apache.org/jira/browse/FLINK-11375 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.7.1 >Reporter: shuai.xu >Assignee: BoWang >Priority: Major > Fix For: 1.8.0 > > > In SlotPool, the AvailableSlots is lock free, so all access to it should in > the main thread of SlotPool, and so all the public methods are called through > SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. > This may cause a ConcurrentModificationException. > 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> > SourceConversion(table:[_DataStreamTable_12, source: > [BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], > fields:(f0)) -> correlate: > table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), > select: > item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s > (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to > FINISHED. > 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] > org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices > meet exception, need to fail global execution graph > java.lang.reflect.UndeclaredThrowableException > at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) > at > org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) > at > org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > at java.lang.Thread.run(Thread.java:834) > Caused by: java.util.concurrent.ExecutionException: > java.util.ConcurrentModificationException > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) > ... 23 more > Caused by: java.util.ConcurrentModificationException > at
[jira] [Commented] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly
[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16758108#comment-16758108 ] BoWang commented on FLINK-11375: Hi, has anyone fixed this? If not, I would like to take it. > Concurrent modification to slot pool due to SlotSharingManager releaseSlot > directly > > > Key: FLINK-11375 > URL: https://issues.apache.org/jira/browse/FLINK-11375 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.7.1 >Reporter: shuai.xu >Priority: Major > Fix For: 1.8.0 > > > In SlotPool, the AvailableSlots is lock free, so all access to it should in > the main thread of SlotPool, and so all the public methods are called through > SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. > This may cause a ConcurrentModificationException. > 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> > SourceConversion(table:[_DataStreamTable_12, source: > [BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], > fields:(f0)) -> correlate: > table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), > select: > item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s > (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to > FINISHED. > 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] > org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices > meet exception, need to fail global execution graph > java.lang.reflect.UndeclaredThrowableException > at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) > at > org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) > at > org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > at java.lang.Thread.run(Thread.java:834) > Caused by: java.util.concurrent.ExecutionException: > java.util.ConcurrentModificationException > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) > ... 23 more > Caused by: java.util.ConcurrentModificationException > at
[jira] [Closed] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph
[ https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-11071. -- > Dynamic proxy classes cannot be resolved when deserializing job graph > - > > Key: FLINK-11071 > URL: https://issues.apache.org/jira/browse/FLINK-11071 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.8.0 >Reporter: Oleg Zhukov >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Attachments: SocketWindowWordCount.java > > Time Spent: 20m > Remaining Estimate: 0h > > It turns impossible to use Java dynamic proxy objects in the job definition > (for example, as a MapFunction implementation). > During deserialization of the job graph, the default implementation of > ObjectInputStream.resolveProxyClass(..) is used, which is not using the > custom class loader (to look into the submitted jar) and therefore throws > ClassNotFoundException. > Looks like in order to address this, > InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom > implementation of resolveProxyClass(..) method as well (in addition to > resolveClass(..)). > In order to reproduce the issue, run the attached SocketWindowWordCount Flink > app. It's a slight variation of the canonical [SocketWindowWordCount > > example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html] > with a dynamic proxy implementation of the flat map transformation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter
BoWang created FLINK-11411: -- Summary: Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter Key: FLINK-11411 URL: https://issues.apache.org/jira/browse/FLINK-11411 Project: Flink Issue Type: Improvement Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769 ] BoWang edited comment on FLINK-11344 at 1/17/19 7:55 AM: - [~hailong wang] Thanks for the comment. The execution attempt list indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. was (Author: eaglewatcher): [~hailong wang] Thanks for the comment. The execution attempt list will indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769 ] BoWang commented on FLINK-11344: [~hailong wang] Thanks for the comment. The execution attempt list will indeed becomes a bit longer if failures occur, but I think keeps restarting is rare, so a bit longer list may be tolerable. Displaying failed attempt could give us a glimpse of the running information without query the log, including running time, on which TaskManager it runs etc. According these simple informations, we could make some actions, e.g., by *orderBy host* on dash board, we can also find the bad host on which many attempts fail to make corresponding operation actions, e.g., add the machine to blacklist. Thus, I think displaying all the execution attempts is more good than harm. > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11344: --- Summary: Display All Execution Attempt Information on Flink Web Dashboard (was: Show All Execution Attempt Information on Flink Web Dashboard) > Display All Execution Attempt Information on Flink Web Dashboard > > > Key: FLINK-11344 > URL: https://issues.apache.org/jira/browse/FLINK-11344 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: BoWang >Assignee: BoWang >Priority: Minor > > Currently, only one Execution Attempt of each sub-task is shown in web > dashboard, thus, only the succeed Attempt is shown when failover occurs. This > may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11344) Show All Execution Attempt Information on Flink Web Dashboard
BoWang created FLINK-11344: -- Summary: Show All Execution Attempt Information on Flink Web Dashboard Key: FLINK-11344 URL: https://issues.apache.org/jira/browse/FLINK-11344 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: BoWang Assignee: BoWang Currently, only one Execution Attempt of each sub-task is shown in web dashboard, thus, only the succeed Attempt is shown when failover occurs. This may be inconvenient to rapidly locate the failure reasons of failed Attempts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11309: --- Description: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. was: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. > Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes > - > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manually > injected some errors to let the execution fail in different phases. In some > cases, the job could recovery from failover and became succeed, but in some > cases, the job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, > when failures occurred after having read data, some *BufferConsumers* have > already released, although tasks retried, the input data is incomplete.
[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11309: --- Description: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # *SpillableSubpartition* could creates multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. was: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. > Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes > - > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected > some errors to let the execution fail in different phases. In some cases, the > job could recovery from failover and became succeed, but in some cases, the > job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, > when failures occurred after having read data, some *BufferConsumers* have > already released, although tasks retried, the input data is incomplete. >
[jira] [Updated] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes
[ https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11309: --- Description: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, each of which is corresponding to a *Execution*. was: Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # Each *SpillableSubpartition* contains multi *SpillableSubpartitionView*s, each of which is corresponding to a *Execution*. > Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes > - > > Key: FLINK-11309 > URL: https://issues.apache.org/jira/browse/FLINK-11309 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.7.1 >Reporter: BoWang >Assignee: BoWang >Priority: Critical > > Hi all, > When running the batch WordCount example, I configured the job execution > mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected > some errors to let the execution fail in different phases. In some cases, the > job could recovery from failover and became succeed, but in some cases, the > job retried several times and failed. > Example: > # If the failure occurred before task read data, e.g., failed before > *invokable.invoke()* in Task.java, failover could succeed. > # If the failure occurred after task having read data, failover did not work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as a > restart region, and the ResultPartitionType between executions is *BLOCKING.* > Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to > write/read shuffle data, and data block is described as *BufferConsumer* > stored in a list called *buffers,* when task requires input data from > *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, > when failures occurred after having read data, some *BufferConsumers* have > already released, although tasks retried, the input data is incomplete. >
[jira] [Created] (FLINK-11309) Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes
BoWang created FLINK-11309: -- Summary: Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes Key: FLINK-11309 URL: https://issues.apache.org/jira/browse/FLINK-11309 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.7.1, 1.7.0, 1.6.2 Reporter: BoWang Assignee: BoWang Hi all, When running the batch WordCount example, I configured the job execution mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: # If the failure occurred before task read data, e.g., failed before *invokable.invoke()* in Task.java, failover could succeed. # If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is *BLOCKING.* Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to write/read shuffle data, and data block is described as *BufferConsumer* stored in a list called *buffers,* when task requires input data from *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, when failures occurred after having read data, some *BufferConsumers* have already released, although tasks retried, the input data is incomplete. Fix Proposal: # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* terminates. # *SpillableSubpartition* should not be released until *ExecutionVertex* terminates. # Each *SpillableSubpartition* contains multi *SpillableSubpartitionView*s, each of which is corresponding to a *Execution*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x
BoWang created FLINK-11295: -- Summary: Rename configuration options of queryable state from query.x to queryable-state.x Key: FLINK-11295 URL: https://issues.apache.org/jira/browse/FLINK-11295 Project: Flink Issue Type: Improvement Reporter: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x
[ https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-11295: --- Component/s: Configuration > Rename configuration options of queryable state from query.x to > queryable-state.x > - > > Key: FLINK-11295 > URL: https://issues.apache.org/jira/browse/FLINK-11295 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x
[ https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-11295: -- Assignee: BoWang > Rename configuration options of queryable state from query.x to > queryable-state.x > - > > Key: FLINK-11295 > URL: https://issues.apache.org/jira/browse/FLINK-11295 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: BoWang >Assignee: BoWang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph
[ https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736888#comment-16736888 ] BoWang commented on FLINK-11071: Hi, has this problem been fixed yet? If not, I would like to fix this. > Dynamic proxy classes cannot be resolved when deserializing job graph > - > > Key: FLINK-11071 > URL: https://issues.apache.org/jira/browse/FLINK-11071 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.8.0 >Reporter: Oleg Zhukov >Priority: Major > Attachments: SocketWindowWordCount.java > > > It turns impossible to use Java dynamic proxy objects in the job definition > (for example, as a MapFunction implementation). > During deserialization of the job graph, the default implementation of > ObjectInputStream.resolveProxyClass(..) is used, which is not using the > custom class loader (to look into the submitted jar) and therefore throws > ClassNotFoundException. > Looks like in order to address this, > InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom > implementation of resolveProxyClass(..) method as well (in addition to > resolveClass(..)). > In order to reproduce the issue, run the attached SocketWindowWordCount Flink > app. It's a slight variation of the canonical [SocketWindowWordCount > > example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html] > with a dynamic proxy implementation of the flat map transformation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph
[ https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-11071: -- Assignee: BoWang > Dynamic proxy classes cannot be resolved when deserializing job graph > - > > Key: FLINK-11071 > URL: https://issues.apache.org/jira/browse/FLINK-11071 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2, 1.7.0, 1.8.0 >Reporter: Oleg Zhukov >Assignee: BoWang >Priority: Major > Attachments: SocketWindowWordCount.java > > > It turns impossible to use Java dynamic proxy objects in the job definition > (for example, as a MapFunction implementation). > During deserialization of the job graph, the default implementation of > ObjectInputStream.resolveProxyClass(..) is used, which is not using the > custom class loader (to look into the submitted jar) and therefore throws > ClassNotFoundException. > Looks like in order to address this, > InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom > implementation of resolveProxyClass(..) method as well (in addition to > resolveClass(..)). > In order to reproduce the issue, run the attached SocketWindowWordCount Flink > app. It's a slight variation of the canonical [SocketWindowWordCount > > example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html] > with a dynamic proxy implementation of the flat map transformation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11281) Two test in StateBackendMigrationTestBase failes
[ https://issues.apache.org/jira/browse/FLINK-11281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang closed FLINK-11281. -- Resolution: Duplicate > Two test in StateBackendMigrationTestBase failes > > > Key: FLINK-11281 > URL: https://issues.apache.org/jira/browse/FLINK-11281 > Project: Flink > Issue Type: Bug >Reporter: BoWang >Priority: Blocker > Attachments: fail.png > > > Hi, all. > I found Flink builds got two stable failures in StateBackendMigrationTestBase > of recent PRs, which also occurs in my development environment. The detailed > errors are as follows. > This currently doesn't pass because the ListSerializer doesn't respect the > reconfigured case, yet. > java.lang.UnsupportedOperationException: The serializer should have been > reconfigured as a new instance; shouldn't be used. > at > org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160) > at > org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154) > at > org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159) > at > org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383) > at > org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401) > at > org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11281) Two test in StateBackendMigrationTestBase failes
[ https://issues.apache.org/jira/browse/FLINK-11281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736815#comment-16736815 ] BoWang commented on FLINK-11281: Ok, this is a duplicated issue, I will close it. > Two test in StateBackendMigrationTestBase failes > > > Key: FLINK-11281 > URL: https://issues.apache.org/jira/browse/FLINK-11281 > Project: Flink > Issue Type: Bug >Reporter: BoWang >Priority: Blocker > Attachments: fail.png > > > Hi, all. > I found Flink builds got two stable failures in StateBackendMigrationTestBase > of recent PRs, which also occurs in my development environment. The detailed > errors are as follows. > This currently doesn't pass because the ListSerializer doesn't respect the > reconfigured case, yet. > java.lang.UnsupportedOperationException: The serializer should have been > reconfigured as a new instance; shouldn't be used. > at > org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160) > at > org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154) > at > org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159) > at > org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383) > at > org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401) > at > org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11281) Two test in StateBackendMigrationTestBase failes
BoWang created FLINK-11281: -- Summary: Two test in StateBackendMigrationTestBase failes Key: FLINK-11281 URL: https://issues.apache.org/jira/browse/FLINK-11281 Project: Flink Issue Type: Bug Reporter: BoWang Attachments: fail.png Hi, all. I found Flink builds got two stable failures in StateBackendMigrationTestBase of recent PRs, which also occurs in my development environment. The detailed errors are as follows. This currently doesn't pass because the ListSerializer doesn't respect the reconfigured case, yet. java.lang.UnsupportedOperationException: The serializer should have been reconfigured as a new instance; shouldn't be used. at org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:160) at org.apache.flink.runtime.testutils.statemigration.TestType$ReconfigurationRequiringTestTypeSerializer.serialize(TestType.java:154) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.serializeKeyGroupAndKey(RocksDBSerializedCompositeKeyBuilder.java:159) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.setKeyAndKeyGroup(RocksDBSerializedCompositeKeyBuilder.java:96) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:383) at org.apache.flink.runtime.state.StateBackendMigrationTestBase.testKeySerializerUpgrade(StateBackendMigrationTestBase.java:401) at org.apache.flink.runtime.state.StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10866) Queryable state can prevent cluster from starting
[ https://issues.apache.org/jira/browse/FLINK-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang reassigned FLINK-10866: -- Assignee: BoWang > Queryable state can prevent cluster from starting > - > > Key: FLINK-10866 > URL: https://issues.apache.org/jira/browse/FLINK-10866 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Critical > Fix For: 1.8.0 > > > The {{KvStateServerImpl}} can currently prevent the {{TaskExecutor}} from > starting. > Currently, the QS server starts per default on port {{9067}}. If this port is > not free, then it fails and stops the whole initialization of the > {{TaskExecutor}}. I think the QS server should not stop the {{TaskExecutor}} > from starting. > We should at least change the default port to {{0}} to avoid port conflicts. > However, this will break all setups which don't explicitly set the QS port > because now it either needs to be setup or extracted from the logs. > Additionally, we should think about whether a QS server startup failure > should lead to a {{TaskExecutor}} failure or simply be logged. Both > approaches have pros and cons. Currently, a failing QS server will also > affect users which don't want to use QS. If we tolerate failures in the QS > server, then a user who wants to use QS might run into problems with state > not being reachable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10866) Queryable state can prevent cluster from starting
[ https://issues.apache.org/jira/browse/FLINK-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16733902#comment-16733902 ] BoWang commented on FLINK-10866: hi, has anyone taken this jira? If not, I would like to take this. > Queryable state can prevent cluster from starting > - > > Key: FLINK-10866 > URL: https://issues.apache.org/jira/browse/FLINK-10866 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.8.0 > > > The {{KvStateServerImpl}} can currently prevent the {{TaskExecutor}} from > starting. > Currently, the QS server starts per default on port {{9067}}. If this port is > not free, then it fails and stops the whole initialization of the > {{TaskExecutor}}. I think the QS server should not stop the {{TaskExecutor}} > from starting. > We should at least change the default port to {{0}} to avoid port conflicts. > However, this will break all setups which don't explicitly set the QS port > because now it either needs to be setup or extracted from the logs. > Additionally, we should think about whether a QS server startup failure > should lead to a {{TaskExecutor}} failure or simply be logged. Both > approaches have pros and cons. Currently, a failing QS server will also > affect users which don't want to use QS. If we tolerate failures in the QS > server, then a user who wants to use QS might run into problems with state > not being reachable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11232) Empty Start Time of sub-task on web dashboard
BoWang created FLINK-11232: -- Summary: Empty Start Time of sub-task on web dashboard Key: FLINK-11232 URL: https://issues.apache.org/jira/browse/FLINK-11232 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.7.1, 1.7.0, 1.6.3, 1.6.2, 1.5.5 Reporter: BoWang Assignee: BoWang Attachments: empty.png -- This message was sent by Atlassian JIRA (v7.6.3#76005)