[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2021-11-09 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20038:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2021-10-31 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20038:
---
Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2021-04-29 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20038:
---
Priority: Minor  (was: Major)

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2021-04-29 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20038:
---
Labels: auto-deprioritized-major  (was: stale-major)

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Major
>  Labels: auto-deprioritized-major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2021-04-22 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20038:
---
Labels: stale-major  (was: )

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Major
>  Labels: stale-major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2020-11-09 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-20038:
--
Component/s: Runtime / Network

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> 
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Jin Xing
>Priority: Major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2020-11-09 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-20038:
--
Description: 
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manner, thus to benefit different scenarios. New shuffle manner tend to 
bring in new abilities which could be leveraged by scheduling layer to provide 
better performance.

>From my understanding, the characteristics of shuffle manner is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I propose to 
>have a fix on this and fully respect to the original meaning of 
>ResultPartitionType#isPipelined().

  was:
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manager, thus to benefit different scenarios. New shuffle manner tend 
to bring in new abilities which could be leveraged by scheduling layer to 
provide better performance.

>From my understanding, the characteristics of shuffle manager is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I 

[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2020-11-09 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-20038:
--
Description: 
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manager, thus to benefit different scenarios. New shuffle manner tend 
to bring in new abilities which could be leveraged by scheduling layer to 
provide better performance.

>From my understanding, the characteristics of shuffle manager is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I propose to 
>have a fix on this and fully respect to the original meaning of 
>ResultPartitionType#isPipelined().

  was:
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manager, thus to benefit different scenarios. New shuffle manner tend 
to bring in new abilities which could be leveraged by scheduling layer to 
provide better performance.

>From my understanding, the characteristics of shuffle manner is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I 

[jira] [Updated] (FLINK-20038) Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.

2020-11-09 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-20038:
--
Description: 
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manager, thus to benefit different scenarios. New shuffle manner tend 
to bring in new abilities which could be leveraged by scheduling layer to 
provide better performance.

>From my understanding, the characteristics of shuffle manner is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I propose to 
>have a fix on this and fully respect to the original meaning of 
>ResultPartitionType#isPipelined().

  was:
After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
shuffle manner, thus to benefit different scenarios. New shuffle manner tend to 
bring in new abilities which could be leveraged by scheduling layer to provide 
better performance.

>From my understanding, the characteristics of shuffle manner is exposed by 
>ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
>leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
>provide a way to describe the new characteristics from a plugged in shuffle 
>manner. I also find that scheduling layer have some weak assumptions on 
>ResultPartitionType. I detail by below example.

In our internal Flink, we develop a new shuffle manner for batch jobs. 
Characteristics can be summarized as below briefly:
1. Upstream task shuffle writes data to DISK;
2. Upstream task commits data while producing and notify "consumable" to 
downstream BEFORE task finished;
3. Downstream is notified when upstream data is consumable and can be scheduled 
according to resource;
4. When downstream task failover, only itself needs to be restarted because 
upstream data is written into disk and replayable;

We can tell the character of this new shuffle manner as:
a. isPipelined=true – downstream task can consume data before upstream finished;
b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
finish by itself no matter if there's downstream task consumes the data in time.

But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
seems contradicts the partition lifecycle management in current scheduling 
layer:
1. The above new shuffle manner needs partition tracker for lifecycle 
management, but current Flink assumes that ALL "isPipelined=true" result 
partitions are released on consumption and will not be taken care of by 
partition tracker 
([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
 – the limitation is not correct for this case.

>From my understanding, the method of ResultPartitionType#isPipelined() 
>indicates whether data can be consumed while being produced, and it's 
>orthogonal to whether the partition is released on consumption. I