[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426124#comment-16426124
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/4/18 8:24 PM:
---

[~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are 
pretty vague. My understanding is that prefix (before the random/entropy part) 
has to be fixed. Either way, latest proposal doesn't prevent user from setting 
the first a few chars as random/entropy part. I just think it is important to 
give user the full control on key names.


was (Author: stevenz3wu):
[~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are 
pretty vague. Mu understanding is that prefix (before the random/entropy part) 
has to be fixed. Either way, latest proposal doesn't prevent user from setting 
the first a few chars as random/entropy part. I just think it is important to 
give user the full control on key names.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-04 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426124#comment-16426124
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/4/18 8:20 PM:
---

[~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are 
pretty vague. Mu understanding is that prefix (before the random/entropy part) 
has to be fixed. Either way, latest proposal doesn't prevent user from setting 
the first a few chars as random/entropy part. I just think it is important to 
give user the full control on key names.


was (Author: stevenz3wu):
[~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are 
pretty vague. Mu understanding is that prefix (before the random/entropy part) 
has to be fixed. Either way, latest proposal doesn't prevent user from setting 
the first a few chars as random/entropy part. 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:43 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization, which means it stayed the same for the operator lifecycle. but 
different operators got different entropy, which is the key for scaling 
parallel operators. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian 

[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:24 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:22 PM:
---

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...
# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true
# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_
# optional: number of chars
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hexes for entropy. hash should 
work equally well. I am not strongly biased either way, even though I don't see 
much benefit of hash over random. deterministic hash doesn't seem to give much 
benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423335#comment-16423335
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 5:53 AM:
---

I don't know if it has to be "the very first characters" for automatic 
repartitioning to work. Amazon didn't explicitly say it. Let me see if I can 
confirm it.

Amazon did explicitly mention in the doc that S3 automatically repartitioning 
bucket if request rate grows *steadily*. Any sudden jump or big rollout has to 
work with S3 team to pre-partition the bucket in anticipation of traffic 
increase.

I still like the flexibility of giving user the full control on checkpoint path 
/ key name (including entropy part).


was (Author: stevenz3wu):
I don't know if it has to be "the very first characters". Amazon didn't 
explicitly say it. Let me see if I can confirm it.

I still like the flexibility of giving user the control on key names (including 
entropy part).

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422976#comment-16422976
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 9:22 PM:
---

I think S3 has more sophisticated pattern searching for entropy (not just from 
the absolutely beginning). we have asked S3 team to  shard the bucket with key 
names like
{code:java}
checkpoints// or 
checkpoints/- {code}
 


was (Author: stevenz3wu):
I think S3 has more sophisticated pattern searching for entropy (not just from 
the absolutely beginning). we have asked S3 team to  shard the bucket with key 
names like "checkpoints/*/*" or 
"checkpoints/*-*" 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423080#comment-16423080
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 8:25 PM:
---

reversing the components (split by slash char) doesn't give user the control on 
key names. e.g. User might want to keep the top level (first component) clean.
{code:java}
s3://bucket/checkpoints/...
s3://bucket/savepoints/...{code}


was (Author: stevenz3wu):
reversing the components (split by slash char) doesn't give user the control on 
key names. e.g. User might want to keep the top level (first component) clean.
s3://bucket/checkpoints/...s3://bucket/savepoints/...

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423072#comment-16423072
 ] 

Jamie Grier edited comment on FLINK-9061 at 4/2/18 8:17 PM:


So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  It's a flat keyspace (I know we all know that of 
course).  The equivalent of a directory listing that would group all the 
checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 


was (Author: jgrier):
So, what I'm suggesting is that we, optionally, split on '/' and reverse the 
components like so:

s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789

becomes

s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints

It's not very ops friendly but that's because S3 isn't a filesystem.  The 
hierarchy isn't real.  The equivalent of a directory listing that would group 
all the checkpoints for a single job would be:

aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints"

I think that would work just fine for our use case.  What about others?

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422843#comment-16422843
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 5:50 PM:
---

Usually 4-char random prefix can go a long way. Even 2-char random prefix can 
be sufficient unless super high workload. Again, as Steve Loughran, nothing 
official. just based on experience and speculation. But I think we should give 
user the control the number of characters for the entropy part.

[~jgrier]  flink checkpoint path is like 
"s3://bucket/checkpoint-path-prefix/chk-121/random-UUID". So yes, reversing key 
name can work, because last part is a random UUID. But I think we should give 
user the control on the part of checkpoint path to introduce entropy. e.g. I 
may want to maintain the top level prefixes (checkpoints and savepoints)
{code:java}
s3://bucket/checkpoints//rest-of-checkpoint-path
s3://bucket/savepoints//rest-of-savepointpoint-path{code}
 


was (Author: stevenz3wu):
Usually 4-char random prefix can go a long way. Even 2-char random prefix can 
be sufficient unless super high workload. Again, as Steve Loughran, nothing 
official. just based on experience and speculation. But I think we should give 
user the control the number of characters for the entropy part.

[~jgrier]  flink checkpoint path is like 
"s3://bucket/checkpoint-path-prefix/chk-121/random-UUID". So yes, reversing key 
name can work. But I think we should give user the control on the part of 
checkpoint path to introduce entropy. e.g. I may want to maintain the top level 
prefixes (checkpoints and savepoints)
{code:java}
s3://bucket/checkpoints//rest-of-checkpoint-path
s3://bucket/savepoints//rest-of-savepointpoint-path{code}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-30 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420885#comment-16420885
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/31/18 1:32 AM:
-

Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the key name.  That should actually work 
very well.


was (Author: jgrier):
Maybe we should keep this super simple and make a change at the state backend 
level where we optionally just reverse the path.  That should actually work 
very well.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417145#comment-16417145
 ] 

Steve Loughran edited comment on FLINK-9061 at 3/28/18 10:33 AM:
-

The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 




was (Author: ste...@apache.org):
The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/



> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416254#comment-16416254
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/27/18 9:13 PM:
-

Yeah, so I completely agree that the response should be a 503 or better yet a 
429 but it's not.  I already ran this through the AWS support channels.  The 
response was essentially that this was "internally" a TooBusyException.  Here's 
their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 


was (Author: jgrier):
Yeah, so I completely agree that should be a 503 but it's not.  I already ran 
this through the AWS channels.  The response was essentially that this was 
"internally" a TooBusyException.  Here's their full response:
{quote}Based on the information provided, I understand that you are 
experiencing some internal errors (status code 500) from S3, which is impacting 
one of your Flink jobs. From the log dive on your provided request IDs, I 
observe that your PutObject request triggered Internal Error with 
TooBusyException. This happens when a bucket receives more requests than it can 
handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests 
per second or more than 300 GET requests per second. So, if your workload is to 
exceed this limit, you'd need to scale your bucket through partitioning. 
Currently, your key space isn't randomized and all your keys include 
"BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". 
Therefore, your bucket isn't being automatically partitioned by S3 and you 
received increased error rates after your requests increased.
{quote}
 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414828#comment-16414828
 ] 

Jamie Grier edited comment on FLINK-9061 at 3/27/18 12:45 AM:
--

[~ste...@apache.org]

Here's the full stack trace:

 

 

 
{quote}"java.lang.Exception: Could not perform checkpoint 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.Exception: Could not complete snapshot 465 for operator 
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
 ... 8 common frames omitted
 Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
 in order to obtain the stream state handle
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
 ... 13 common frames omitted
 Caused by: java.io.IOException: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended 
Request ID: BLAH_
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
 ... 18 common frames omitted
 Caused by: 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 We encountered an internal error. Please try again. (Service: Amazon S3; 
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
 
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
 

[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:49 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash/random prefix solution can work for both scenarios. but the 
hash need to be applied to the complete path (including the last random UUID 
part). 

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). 

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414696#comment-16414696
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:46 PM:
-

It seems that our internal change *only* works with internal checkpoint. It 
doesn't work for external checkpoint and savepoint.  A more general discussion 
may be needed on how to make it work for external checkpoint and savepoint. I 
know Flink community is also thinking about merging internal/external 
checkpoint and savepoint.

Here are some initial thoughts for external checkpoint and savepoint.
 * uber metadata file (written by jobmanager) keep the same base path (without 
random/hash entropy). this is needed to help with implementing retention policy 
by prefix query (s3 list). this is not an issue for internal checkpoint, 
because zookeeper gives us such root reference even with entropy substitution.
 * actual data files written by operators from task manager needs to have 
entropy in key name to spread the writes to different S3 partitions.

 


was (Author: stevenz3wu):
It seems that our internal change *only* works with internal checkpoint. It 
doesn't work for external checkpoint and savepoint. I know Flink community is 
also thinking about merging internal/external checkpoint and savepoint. A more 
general discussion may be needed. 

Ideally, what I would like to see for external checkpoint and savepoint.
 * uber metadata file (written by jobmanager) keep the same base path (without 
random/hash entropy). this is needed to help with implementing retention policy 
by prefix query (s3 list). this is not an issue for internal checkpoint, 
because zookeeper gives us such root reference even with entropy substitution.
 * actual data files written by operators from task manager needs to have 
entropy in key name to spread the writes to different S3 partitions.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:39 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). 

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). For the hash approach, maybe it should hash(base_path, 
operator_id)?

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:38 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml). For the hash approach, maybe it should hash(base_path, 
operator_id)?

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:34 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object 
with_  _ENTROPY___KEY__ substituted with a 4-char random hex. 

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object with 
 __ENTROPY___KEY__ substituted with a 4-char random hex. Our change doesn't 
work for external checkpoint and savepoint.

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:28 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. 
Basically, when each operator creates the FsCheckpointStreamFactory object with 
 __ENTROPY___KEY__ substituted with a 4-char random hex. Our change doesn't 
work for external checkpoint and savepoint.

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:24 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we are likely to get throttled when taking a savepoint for single large-state 
job.

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:21 PM:
-

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. Also for this case, we set 
`state.backend.fs.memory-threshold` to disable S3 writes from task managers. 
Only job manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:31 PM:


[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to 
be applied to the complete path (not just the prefix configured in 
flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:30 PM:


[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state written from thousands of operators. 
To avoid throttling, we need S3 writes *from many operators* to spread to 
different S3 partitions. that's where we want to inject *dynamic* 4-char random 
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM:


[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of _ENTROPY___KEY_

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of _ENTROPY__KEY_

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM:


[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of __ENTROPY___KEY__

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of _ENTROPY___KEY_

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM:


[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write. that was my earlier proposal of _ENTROPY__KEY_

I can see that hash approach can work for both scenario.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. I can see that 
_hash_ approach can work for both scenario.
 * thousands of massively parallel/stateless routing jobs for Keystone data 
pipeline. for that, we configure a *static* 4-char random hex for the 
checkpoint path for each routing job during deployment so that S3 writes *from 
many jobs* are spread different S3 partitions. that requires no change in 
Flink, just some deployment tooling automation. 
 * single large-state job with TBs state and thousands of operators. To avoid 
throttling, we need S3 writes *from many operators* to spread to different S3 
partitions. that's where we want to inject *dynamic* 4-char random hex for each 
S3 write.

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, 
we can never take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628
 ] 

Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:43 AM:
--

That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
 The here proposed is still needed, though.

We could approach this in two ways:

1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

2. Make this a general change in the S3 filesystem, to use a deterministic path 
hash like Jamie suggested. That would help to solve this across other use cases 
of S3 as well. Whenever the path contains the segment {{_ hash _}} this would 
be replaced by such a hash.


was (Author: stephanewen):
That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
 The here proposed is still needed, though.

We could approach this in two ways:

1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

2. Make this a general change in the S3 filesystem, to use a deterministic path 
hash like Jamie suggested. That would help to solve this across other use cases 
of S3 as well. Whenever the path contains the segment `_hash_` this would be 
replaced by such a hash.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628
 ] 

Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:42 AM:
--

That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
 The here proposed is still needed, though.

We could approach this in two ways:

1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

2. Make this a general change in the S3 filesystem, to use a deterministic path 
hash like Jamie suggested. That would help to solve this across other use cases 
of S3 as well. Whenever the path contains the segment `_hash_` this would be 
replaced by such a hash.


was (Author: stephanewen):
That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix 

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
The here proposed is still needed, though.

We could approach this in two ways:

  1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

  2. Make this a general change in the S3 filesystem, to use a deterministic 
path hash like Jamie suggested. That would help to solve this across other use 
cases of S3 as well. Whenever the path contains the segment {{_hash_}} this 
would be replaced by such a hash.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628
 ] 

Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:41 AM:
--

That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix 

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
The here proposed is still needed, though.

We could approach this in two ways:

  1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

  2. Make this a general change in the S3 filesystem, to use a deterministic 
path hash like Jamie suggested. That would help to solve this across other use 
cases of S3 as well. Whenever the path contains the segment {{_hash_}} this 
would be replaced by such a hash.


was (Author: stephanewen):
That would be a great contribution, valuable for many S3 users. Indeed, S3's 
partitioning by prefix 

FYI: The 1.5 release will have a few changes to the state backends that will 
reduce some S3 requests (mainly HEAD, if you use s3a).
The here proposed is still needed, though.

We could approach this in two ways:

  1. Make the change to the state backend itself, to allow it to insert entropy 
into the paths. We can think about only doing this for checkpoints, but not for 
savepoints, because then savepoints would still be together in "one directory" 
(under the same prefix).

  2. Make this a general change in the S3 filesystem, to use a deterministic 
path hash like Jamie suggested. That would help to solve this across other use 
cases of S3 as well. Whenever the path contains the segment *_hash_* this would 
be replaced by such a hash.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 3:06 AM:


Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there seems no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously a down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven


was (Author: stevenz3wu):
Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 1:02 AM:


Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven


was (Author: stevenz3wu):
Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
state.backend.fs.checkpointdir.injectEntropy.enabled: true
state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

 

 

 

 

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)