[jira] [Commented] (FLINK-32045) optimize task deployment performance for large-scale jobs

2023-05-11 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721774#comment-17721774
 ] 

Weihua Hu commented on FLINK-32045:
---

Thanks [~Thesharing]  for your reply, your comment are very meaningful and 
valuable. Let me try to answer one by one.
h3. Distribution of shuffle descriptors via blob server.
IMO, there are two things should considered to whether enable distribution of 
shuffle descriptors via blob server.
 # 
The size of shuffle descriptors. This is related to the parallelism of 
producers for a single consumer.
 # 
How many times should this shuffle descriptors transport to TaskManager. This 
is related to the parallelism of consumers for this producer.

  So, I think it is better to use the number of edges in ConsumedPartitionGroup 
to decide whether to enable blob server offload.
  And I'd like to make this logic internally (give a proper default value, for 
example 1000*1000, should be decided after some benchmark) since it really 
needs advanced knowledge for users to figure it out how to set it.
h3. how much performance it would improve with a cache for shuffle descriptors 
in the TaskManager.
 I have tests in this environment. Yarn cluster with 2000 TaskManager. Each 
TaskManager has 6 core and 16GB memory and set "taskmanager.numberOfTaskSlots" 
to 10. Submit a simple WordCount with 2 parallelism. * Without blob server 
offload, the job failed with submitTask RPC timeout. All CPU of JobManager used 
to serialized RPC:submitTask.
 * With blob server offload but no TaskExecutor cache, deploy all tasks take 25s
 * With blob server offload and TaskExecutor cache, deploy all tasks take 15s

h3. How to update the cache?
As you mentioned, it's too complicated to keep cache in JobManager and 
TaskExecutor consistent. So, we will add some constraints to the Cache
 # 
Cache will be enabled when necessary (same conditions with distribution of 
shuffle descriptors via blob server). In most cases serialized shuffle 
descriptors are small and transport in akka message, the cost of 
deserialization is very small, they do not need to be cached.
 # 
Cache of job will be cleared when task executor disconnects with job master.
 # 
Cache with TTL. We should configure a proper default ttl value, for example 3 
mins (some batch job may deploy lazily)
 # 
The max size of cache. As you mentioned, LRUCache or FIFOCache is reasonable. 
Since the slots of a Task Manager won't be too large, the cache size won't be 
too large either.

For session mode(more exactly OLAP). IMO, Most of the scenarios are a lot of 
small queries. As mentioned above, they won't use cache in most cases. And the 
cache will be removed when the job is finished( task executor disconnects with 
job master), so the cache won't occupy too much memory in a short time.
 
 
Thanks again. Also thanks to [~zhuzh] ,[~wanglijie] , [~Weijie Guo] for the 
previous offline discussions.
Glad to hear any suggestions.

> optimize task deployment performance for large-scale jobs
> -
>
> Key: FLINK-32045
> URL: https://issues.apache.org/jira/browse/FLINK-32045
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Priority: Major
>
> h1. Background
> In FLINK-21110, we cache shuffle descriptors on the job manager side and 
> support using blob servers to offload these descriptors in order to reduce 
> the cost of tasks deployment.
> I think there is also some improvement we could do for large-scale jobs.
>  # The default min size to enable distribution via blob server is 1MB. But 
> for a large wordcount job with 2 parallelism, the size of serialized 
> shuffle descriptors is only 300KB. It means users need to lower the 
> "blob.offload.minsize", but the value is hard for users to decide.
>  # The task executor side still needs to load blob files and deserialize 
> shuffle descriptors for each task. Since these operations are running in the 
> main thread, it may be pending other RPCs from the job manager.
> h1. Propose
>  # Enable distribute shuffle descriptors via blob server automatically. This 
> could be decided by the edge number of the current shuffle descriptor. The 
> blob offload will be enabled when the edge number exceeds an internal 
> threshold.
>  # Introduce cache of deserialized shuffle descriptors on the task executor 
> side. This could reduce the cost of reading from local blob files and 
> deserialization. Of course, the cache should have TTL to avoid occupying too 
> much memory. And the cache should have the same switch mechanism as the blob 
> server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32045) optimize task deployment performance for large-scale jobs

2023-05-10 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721632#comment-17721632
 ] 

Zhilong Hong commented on FLINK-32045:
--

Thank you for proposing these optimizations, Weihua!
 # For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?


 # We thought about introducing a cache for shuffle descriptors in the 
TaskManager earlier. Since users usually won't set a large number for the 
configuration "taskmanager.numberOfTaskSlots", which means there would only be 
a few slots in a TaskManager (for example, 8?). There won't be a lot of 
deserialization work on the TaskManager side. So, I'm wondering how much 
performance it would improve with a cache for shuffle descriptors in the 
TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed anything or 
anything I said is wrong. Thank you.

> optimize task deployment performance for large-scale jobs
> -
>
> Key: FLINK-32045
> URL: https://issues.apache.org/jira/browse/FLINK-32045
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Priority: Major
>
> h1. Background
> In FLINK-21110, we cache shuffle descriptors on the job manager side and 
> support using blob servers to offload these descriptors in order to reduce 
> the cost of tasks deployment.
> I think there is also some improvement we could do for large-scale jobs.
>  # The default min size to enable distribution via blob server is 1MB. But 
> for a large wordcount job with 2 parallelism, the size of serialized 
> shuffle descriptors is only 300KB. It means users need to lower the 
> "blob.offload.minsize", but the value is hard for users to decide.
>  # The task executor side still needs to load blob files and deserialize 
> shuffle descriptors for each task. Since these operations are running in the 
> main thread, it may be pending other RPCs from the job manager.
> h1. Propose
>  # Enable distribute shuffle descriptors via blob server automatically. This 
> could be decided by the edge number of the current shuffle descriptor. The 
> blob offload will be enabled when the edge number exceeds an internal 
> threshold.
>  # Introduce cache of deserialized shuffle descriptors on the task executor 
> side. This could reduce the cost of reading from local blob files and 
> deserialization. Of course, the cache should have TTL to avoid occupying too 
> much memory. And the cache should have the same switch mechanism as the blob 
> server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)