[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-22 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385644#comment-17385644
 ] 

Zhu Zhu commented on FLINK-23218:
-

Thanks for confirming! [~trohrmann]
And thanks for the explanation for the transient blob option. I think you are 
right that we can try re-offload {{JobInformation}}, {{TaskInformation}} and 
{{ShuffleDescriptors}} before deploying a task. It may need some extra efforts 
though to track and de-duplicate blobs on BlobServer. So in the first step we 
will try introducing a {{read()}} API in {{PermanentBlobService}} which might 
be simpler.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-22 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385585#comment-17385585
 ] 

Till Rohrmann commented on FLINK-23218:
---

Just for my understanding: When will a {{TaskExecutor}} reread the 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptors}}? Won't this 
only happen after the {{JM}} redeploys the {{Tasks}}? In this case, the {{JM}} 
could store these blobs again.

But I am ok with introducing a not so permanent blob entry that is eligible for 
LRU pruning. I think it is a good idea to solve this on the {{BlobCache}} side 
by providing an API which allows you to say that the cache does not need to 
keep this file because in doubt it can be downloaded from the server again and 
it is not actively used during the execution of the {{Task}}.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17385234#comment-17385234
 ] 

Zhu Zhu commented on FLINK-23218:
-

Just summarize the investigation and discussion, a common blob cache size limit 
may be hard and complicated to implement. And it can be hard for users to 
understand that there is a config to limit blob cache size but it only works 
for some of the permanent blobs.

Therefore, I'm thinking to just limit the size of {{ShuffleDescriptor}} blob 
cache and leave other exiting blobs not affected. In the first version we can 
hard code the size limit to be 100MB. It should be enough for 
{{ShuffleDescriptor}} blob cache, because a {{ShuffleDescriptor}} blob for a 
8000x8000 ALL-to-ALL {{JobEdge}} is just 200KB+ and normally there will not be 
too many such kinds of blobs. In this way, we do not need to expose a new 
config to users. And existing blob usages will not be affected. 

To achieve that, we can add a {{readAndTrack()}} method in 
{{PermanentBlobCache}} and only use it for {{ShuffleDescriptor}} blobs.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384876#comment-17384876
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took a look at the code and looks to me that transient blobs are not deleted 
once read. {{AbstractTaskManagerFileHandler}} will delete the transient blobs 
of log files but it does not apply to all transient blobs in {{BlobServer}}.
However, I can see that transient blobs have a TTL by design which will start 
ticking once generated and will be refreshed once read. Cached 
{{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptor}} are possible 
to be used for many times during a long time span which can exceeds the TTL. So 
I think current transient blobs cannot meet the requirement.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384868#comment-17384868
 ] 

Zhu Zhu commented on FLINK-23218:
-

I think the PR mentioned above should be 
https://github.com/apache/flink/pull/16498.



> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384827#comment-17384827
 ] 

Zhilong Hong commented on FLINK-23218:
--

Thank you for your reply, [~trohrmann]. 

??Doesn't the {{PermanentBlobCache}} already do ref counting???

 PermanentBlobCache only do ref count for the entire job. Every time a slot is 
registered on TaskExecutor, the ref +1. As we are trying to distribute the 
cache of ShuffleDescriptors via blob server, the size of blob may grow rapidly 
as multiple failover happens. In this situation, these blobs wouldn't be 
deleted because the ref > 0. Thus we delete the blob in PermanentBlobCache in 
LRU order. If the cached blob is deleted, it can be retrieved from the blob 
server again.

??We might think about making the {{JobInformation}} and {{TDDs}} transient 
blobs.??

Transient blobs will be deleted from the blob server once they are downloaded 
to the cache. In my opinion, maybe it's not a good option. 

I've already implemented a POC based on the size limit. Would you mind having a 
look at it if you got any free time? https://github.com/apache/flink/pull/16538

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-21 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384734#comment-17384734
 ] 

Till Rohrmann commented on FLINK-23218:
---

I think I am not fully understanding the problem yet [~Thesharing]. Doesn't the 
{{PermanentBlobCache}} already do ref counting? Couldn't we say that we only 
consider blobs for LRU removal if they are no longer referenced? If we end up 
in a situation where we still use all blobs and run out of disk, then the user 
needs to increase the blob size.

As far as I remember we rarely use the {{TransientBlobCache}}. In fact we 
should only use it for sending log and stdout files to the {{JobManager}} 
process. Not sure whether they are a problem.

If this should not be good enough, then we might think about making the 
{{JobInformation}} and {{TDDs}} transient blobs. I think they are only needed 
to deserialize the Java object and are then no longer accessed.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-20 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17383878#comment-17383878
 ] 

Zhilong Hong commented on FLINK-23218:
--

After investigation, we think there are three ways to limit the size of blob 
cache.

First of all, we need to emphasize that size limit is only applicable for the 
permanent blob cache. For transient blobs, blob server will delete it once it's 
transmitted, thus we cannot delete any transient blobs even it exceeds the size 
limit.
 # *Synchronization:* Blob cache will query blob server about which blob is 
still alive and delete all the dead blobs once in a while.

 ## Pros: This will make sure that when blob is removed from the cache, it's 
useless and won't be accessed anymore. 
 ## Cons: This requires more HTTP calls and will increase the pressure for the 
JobManager. When there are a lot of blobs on blob server, it may require a lot 
of time to collect them on blob server, send them to TaskManager and compare 
them with blob caches.
 # *Size limit:* Blob cache will maintain a size limit tracker for blobs that 
can be deleted. Once the size of blobs exceeds the limit, the blobs will be 
removed in LRU order. The deleted blobs can be retrieved from blob server once 
it's needed again.
 ## Pros: This requires no communication with blob server. This makes sure that 
the blobs of ShuffleDescriptors won't blow up the disk on TaskExecutor.
 ## Cons: There are some blobs cannot be deleted, like user code classloader 
jar file, custom files stored in FileCache, and etc. If these files are 
deleted, the access to these File handlers will throw IOException unexpectedly. 
We cannot track these blobs.
 # *Reference counter*: Blob cache will maintain a reference counter for each 
blob. Once it is accessed, we record it to the counter. And once the usage is 
released, we minus the counter. When the counter is 0, we can remove the blob 
cache.
 ## Pros: This requires no communication with blob server. This counter can 
track all the permanent blobs.
 ## Cons: It's hard to maintain the correctness for the counter, since not all 
the usages are explicitly released. Also it will spread the reference of blob 
cache everywhere in the code. That makes the implementation complicated. 

In our opinion, Option 2 is more preferable. Its implementation is not that 
complicated, and it avoids that blobs introduced in FLINK-23218 won't blow up 
the disk.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-19 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17383265#comment-17383265
 ] 

Zhilong Hong commented on FLINK-23218:
--

Hi, [~trohrmann]. While we are trying to implement the size limit for blob 
cache on TaskExecutor, we find that we cannot delete each last recently used 
permanent blob cache when the size limit exceeds. {{BlobLibraryCacheManager}} 
on TaskExecutor will cache the File pointer for user code classloader. If we 
delete the permanent blob, the access of these classloaders would raise an 
IOException. I'm not sure how to solve this problem without making blob cache 
more complicated. Do you think it's a good idea to implement the 
synchronization rather than the size limit? Thank you in advance.

??(Mentioned here) For each TaskManager, its blob cache syncs the status of all 
blobs with the blob server every 5 minutes (it's configurable). If it's removed 
from the blob server, it would be removed from the blob cache, too.??

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-11 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17378872#comment-17378872
 ] 

Yang Wang commented on FLINK-23218:
---

Just a side input, the Yarn local resource cache also has a similar 
configuration(\{{yarn.nodemanager.localizer.cache.target-size-mb}}), that the 
default value is 10240mb. Other advanced options(e.g. cleanup.interval-ms, 
max-files-per-directory, etc.) could also be configured.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377977#comment-17377977
 ] 

Zhilong Hong commented on FLINK-23218:
--

Thank you so much for your suggestions, [~trohrmann] and [~zhuzh]. I'll open 
another JIRA to propose the idea of cleaning blob cache on TaskExecutor based 
on the size limit.

Furthermore, I've asked [~fly_in_gis] about the size limitation of the local 
files for TaskExecutors running on containers in YARN or pods in Kubernetes. It 
turns out that out of space only happens when the disk on the _physical_ node 
is out of space. So I think 10GB sounds good to me.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377963#comment-17377963
 ] 

Zhu Zhu commented on FLINK-23218:
-

I took another think and 10GB sounds good to me now. If we always want users to 
limit the size when configuration "blob.offload.minsize" for large scale jobs.
Adding a limit by default is always better than no limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377961#comment-17377961
 ] 

Zhu Zhu commented on FLINK-23218:
-

10GB looks a bit too large to limit the blob size by default.
If we want the limit to be very large by default, maybe we can make it 
unlimited by default? Users who is changing configuration 
"blob.offload.minsize" for large scale jobs should be aware of the residual 
issue and set the size limit.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-09 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377936#comment-17377936
 ] 

Till Rohrmann commented on FLINK-23218:
---

Do you think that setting the size limit to 10 GB would be possible? This 
requires that a machine on which Flink runs has per default free disk capacity 
of 10 GB.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377811#comment-17377811
 ] 

Zhu Zhu commented on FLINK-23218:
-

1. To not affect existing users, I prefer limit to not be too small otherwise 
the deployment performance may be affected and more loads download can result 
in heavier master/dfs loads. 1GB sound good to me. Note that for most users, 
jobs are low scale and the {{ShuffleDescriptors}} cache can be very small and 
will not be shipped via blobs. So that a large limit will not cause new issues 
(compared that currently there is no limitation). 
2. For low scale jobs, the {{ShuffleDescriptors}} cache will not be shipped via 
blobs, so residue problems will not be worse. Even for large scale jobs, IIRC, 
the compressed {{ShuffleDescriptors}} cache of a 8000x8000 shuffle is 200k+ 
bytes which still does not exceed the 1MB blob offloading threshold. Therefore 
I think we can document for configuration "blob.offload.minsize" to notify 
users to be aware of the residuals and blob size limit.




> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-07 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376501#comment-17376501
 ] 

Zhilong Hong commented on FLINK-23218:
--

Thank you so much for your suggestion, [~trohrmann]. The size limit will make 
sure that the disk won't be out of space. I think it's a better solution. But 
there are more details that need to be discussed:
 # How large should the default value of the size limit be? I'm not sure what 
factors should be considered. Maybe 1 GiB?
 # There will be residues when the job is over. These residues will be removed 
until one hour (configurable as {{blob.service.cleanup.interval}}) after the 
job finishes (As PermanentBlobCleanupTask does) for the session mode. Is that 
okay?

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-07 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376472#comment-17376472
 ] 

Till Rohrmann commented on FLINK-23218:
---

I think your proposal for the clean up of blobs can work [~Thesharing]. 

Have you also considered adding a size limit and and deleting blobs in lru 
order if the size limit is exceeded? This mechanism could also work and would 
put less stress on the {{JobManager}} process because of fewer RPCs. Moreover, 
it could in general be a good mechanism to harden the {{TaskManager}} processes 
because otherwise you might run into the situation that you run out of disk if 
all the blobs are still required. With the LRU cache, we would drop some blobs 
and then download them again if really needed.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-06 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375416#comment-17375416
 ] 

Zhilong Hong commented on FLINK-23218:
--

Thank you for your reply, [~trohrmann]. 
 # As for the compression step, the main concern is the time this procedure 
costs. We've tested in several scenarios.
 ## For the vertices connected with all-to-all edge, when parallelism = 1000, 
the time of serialization and compression for the shuffle descriptors is 21 ms.
 ## When parallelism = 8000, the time is 139 ms. It's worth noting that due to 
the cache, we only need to compress it once.
 ## For pointwise edge, when parallelism = 8000, the time of compression is 
less than 1ms.
 ## As for decompression happened in TaskManager, when parallelism = 8000 and 
the edge is all-to-all, it takes less than 1ms to decompress and deserialize 
the ShuffleDescriptors.
 ## So I think there's no drawback caused by the compression step, for both low 
scale and large scale jobs.
 # However, there's one blocker and we request for your opinion.
 ## Distributing the ShuffleDescriptors via blob server aims to solve the issue 
related to GC.Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, the cache would become a heavy burden 
for the garbage collector to deal with.
 ## When we are trying to distribute the ShuffleDescriptors via blob server, we 
find that it may be a disaster if we don't clean it up. When large amount of 
failovers happen, there will be a lot of cache stored on local disk. In extreme 
cases, the blob would blow up the disk space.
 ## It's worth noting that JobInformation and TaskInformation have the same 
issue, too. For OLAP scenario, if a lot of jobs are submitted, the blob cache 
on TaskManager will stay until one hour after the job is finished.
 ## So we need to remove the blob cache when it's useless. For Jobmanager, we 
can remove it once failover happens or the partition is released. But for 
TaskManager, it would be hard to deal with the blob cache. At present, the blob 
cache is removed in an hour after the job is finished. 
 ## We come up with a proposal: for each TaskManager, its blob cache syncs the 
status of all blobs with the blob server every 5 minutes (it's configurable). 
If it's removed from the blob server, it would be removed from the blob cache, 
too.

We're looking forward to your suggestions and opinions for this blocker issue. 
Thank you so much in advance.

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17374869#comment-17374869
 ] 

Till Rohrmann commented on FLINK-23218:
---

Thanks for creating this proposal [~Thesharing]. Are there any known drawbacks 
for low scale jobs caused by the compression step?

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of