[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-11-03 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-720663470







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-28 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-718141948


   Looks good form my side!
   
   Let's wait for @zhijiangW and @gaoyunhaii and CI to give their ok as well.
   
   We can start looking at documentation in the meantime. I think we need a 
quick description of this and how to configure it in the memory configuration 
section: 
 - https://ci.apache.org/projects/flink/flink-docs-master  ==> Deployment & 
Operations ==> Memory Configuration
 - Or add it to the tuning page: 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_tuning.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-28 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-717865867


   @wsry Your suggestion sounds good to me!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-27 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-717350591


   Thanks for explaining, @wsry 
   
   Maybe we can find a good combination of "min-parallelism" and "min-buffers" 
to make the switch to sort-based shuffle work well for users. Because then the 
very low parallelism cases would use the partition-based shuffle and the 
higher-parallelism cases the sort-based shuffle.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-27 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-717204918


   @wsry I have two more questions on the minimum buffers logic and confi
   
   (1)
   
   I previously thought that this is to ensure that the sort buffers are not 
too small. But I saw now that they can actually be smaller, because the minimum 
is `Math.min(minBuffersPerSortMergePartition , numSubpartitions + 1)`. So if we 
have a low parallelism, then the sort shuffle might run with something like 5 
buffers (assuming a user turns it on for all shuffles). Is that on purpose?
   
   Does it make sense to just use the `minBuffersPerSortMergePartition` value 
and not lower it below that?
   
   BTW: I think I now understand why you called it "max buffers" before - 
because it is the maximum to which the minimum requirement can grow.
   
   (2)
   
   The current value for `taskmanager.network.sort-shuffle.min-buffers` may be 
high for mini clusters. At least if we end up doing what is proposed in (1) and 
not lower it for lower parallelism.
   
   The current min is 512 buffers (16 MB) and the mini cluster has only 64 MB 
network memory by default. So as soon as there are four shuffles, the mini 
cluster will break, which is pretty soon.
   We should probably set a different min on the mini cluster, like 64 buffers 
or so.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-26 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-716450042


   The updated logic looks good. But the name `maxBuffersPerSortMergePartition` 
seems confusing. I think we should rename this to 
`minBuffersPerSortMergePartition` and also rename the config key to 
`taskmanager.network.sort-shuffle.min-buffers`.
   
   What is a good default for the minimum number of buffers? Maybe something 
like 512?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

2020-10-26 Thread GitBox


StephanEwen commented on pull request #13595:
URL: https://github.com/apache/flink/pull/13595#issuecomment-716404546


   @wsry 
   
   I am confused a bit about the use of max buffers. That value is the upper 
limit of buffers that will be assigned to the shuffle from the global pool. It 
is only used for streaming pipelined connections, to limit the amount of 
in-flight buffers (so checkpoints don't take too long). For batch it should not 
be used, because there is no harm in using as much memory as possible.
   
   The min buffers is actually hat decouples the memory use from the 
parallelism. If the min-buffers is related to the number of subpartitions, then 
we still have the problem that shuffles fail on large parallelism (parallelism 
is higher than available memory buffers).
   
   So, in conclusion, I think we should not have a max value (because it does 
not help in decoupling from parallelism) and also need to decouple the min 
value from the parallelism.
   
   I also liked the idea of having the 
`taskmanager.network.sort-shuffle.min-parallelism` as a flag. That way low 
parallelisms (< 50) could use the hash shuffle and larger parallelisms could 
use the sort shuffle.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org