[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426124#comment-16426124 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/4/18 8:24 PM: --- [~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are pretty vague. My understanding is that prefix (before the random/entropy part) has to be fixed. Either way, latest proposal doesn't prevent user from setting the first a few chars as random/entropy part. I just think it is important to give user the full control on key names. was (Author: stevenz3wu): [~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are pretty vague. Mu understanding is that prefix (before the random/entropy part) has to be fixed. Either way, latest proposal doesn't prevent user from setting the first a few chars as random/entropy part. I just think it is important to give user the full control on key names. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426124#comment-16426124 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/4/18 8:20 PM: --- [~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are pretty vague. Mu understanding is that prefix (before the random/entropy part) has to be fixed. Either way, latest proposal doesn't prevent user from setting the first a few chars as random/entropy part. I just think it is important to give user the full control on key names. was (Author: stevenz3wu): [~jgrier] Amazon doesn't want to reveal internal details, hence sometimes are pretty vague. Mu understanding is that prefix (before the random/entropy part) has to be fixed. Either way, latest proposal doesn't prevent user from setting the first a few chars as random/entropy part. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:43 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization, which means it stayed the same for the operator lifecycle. but different operators got different entropy, which is the key for scaling parallel operators. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:24 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. We can make the change if this is desired. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:23 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hex chars for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424625#comment-16424625 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:22 PM: --- [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars. default is 4 state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven was (Author: stevenz3wu): [~jgrier] [~StephanEwen] Here are our thinking. if you think it makes sense, we can submit a PR for checkpoint. As Stephan mentioned earlier, savepoint probably needs to be tackled separately. 1) new config to enable dynamic entropy injection {code:java} # user has full control on checkpoint path (including entropy key substitution) # _ENTROPY_KEY_ can be at any part of the checkpoint path state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/... # boolean flag to enable entropy injection state.backend.fs.checkpointdir.injectEntropy.enabled: true # substring for entropy substitution state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_ # optional: number of chars state.backend.fs.checkpointdir.injectEntropy.length: 4{code} 2) random v.s. hash: we are generating random hexes for entropy. hash should work equally well. I am not strongly biased either way, even though I don't see much benefit of hash over random. deterministic hash doesn't seem to give much benefit 3) our current implementation does the entropy substitution during operator initialization. Conceptually, a better way is probably doing entropy substitution for each S3 write. Practically, it probably doesn't make much difference in terms of spreading the load and throughput, because either way each operator got its own entropy prefix Thanks, Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423335#comment-16423335 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 5:53 AM: --- I don't know if it has to be "the very first characters" for automatic repartitioning to work. Amazon didn't explicitly say it. Let me see if I can confirm it. Amazon did explicitly mention in the doc that S3 automatically repartitioning bucket if request rate grows *steadily*. Any sudden jump or big rollout has to work with S3 team to pre-partition the bucket in anticipation of traffic increase. I still like the flexibility of giving user the full control on checkpoint path / key name (including entropy part). was (Author: stevenz3wu): I don't know if it has to be "the very first characters". Amazon didn't explicitly say it. Let me see if I can confirm it. I still like the flexibility of giving user the control on key names (including entropy part). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422976#comment-16422976 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 9:22 PM: --- I think S3 has more sophisticated pattern searching for entropy (not just from the absolutely beginning). we have asked S3 team to shard the bucket with key names like {code:java} checkpoints// or checkpoints/- {code} was (Author: stevenz3wu): I think S3 has more sophisticated pattern searching for entropy (not just from the absolutely beginning). we have asked S3 team to shard the bucket with key names like "checkpoints/*/*" or "checkpoints/*-*" > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423080#comment-16423080 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 8:25 PM: --- reversing the components (split by slash char) doesn't give user the control on key names. e.g. User might want to keep the top level (first component) clean. {code:java} s3://bucket/checkpoints/... s3://bucket/savepoints/...{code} was (Author: stevenz3wu): reversing the components (split by slash char) doesn't give user the control on key names. e.g. User might want to keep the top level (first component) clean. s3://bucket/checkpoints/...s3://bucket/savepoints/... > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423072#comment-16423072 ] Jamie Grier edited comment on FLINK-9061 at 4/2/18 8:17 PM: So, what I'm suggesting is that we, optionally, split on '/' and reverse the components like so: s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789 becomes s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints It's not very ops friendly but that's because S3 isn't a filesystem. The hierarchy isn't real. It's a flat keyspace (I know we all know that of course). The equivalent of a directory listing that would group all the checkpoints for a single job would be: aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints" I think that would work just fine for our use case. What about others? was (Author: jgrier): So, what I'm suggesting is that we, optionally, split on '/' and reverse the components like so: s3://my_bucket/flink/checkpoints/JOB_ID/chk_000/123456789 becomes s3://my_bucket/123456789/CHK_000/JOB_ID/checkpoints It's not very ops friendly but that's because S3 isn't a filesystem. The hierarchy isn't real. The equivalent of a directory listing that would group all the checkpoints for a single job would be: aws s3 list s3://my_bucket | grep "JOB_ID/checkpoints" I think that would work just fine for our use case. What about others? > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422843#comment-16422843 ] Steven Zhen Wu edited comment on FLINK-9061 at 4/2/18 5:50 PM: --- Usually 4-char random prefix can go a long way. Even 2-char random prefix can be sufficient unless super high workload. Again, as Steve Loughran, nothing official. just based on experience and speculation. But I think we should give user the control the number of characters for the entropy part. [~jgrier] flink checkpoint path is like "s3://bucket/checkpoint-path-prefix/chk-121/random-UUID". So yes, reversing key name can work, because last part is a random UUID. But I think we should give user the control on the part of checkpoint path to introduce entropy. e.g. I may want to maintain the top level prefixes (checkpoints and savepoints) {code:java} s3://bucket/checkpoints//rest-of-checkpoint-path s3://bucket/savepoints//rest-of-savepointpoint-path{code} was (Author: stevenz3wu): Usually 4-char random prefix can go a long way. Even 2-char random prefix can be sufficient unless super high workload. Again, as Steve Loughran, nothing official. just based on experience and speculation. But I think we should give user the control the number of characters for the entropy part. [~jgrier] flink checkpoint path is like "s3://bucket/checkpoint-path-prefix/chk-121/random-UUID". So yes, reversing key name can work. But I think we should give user the control on the part of checkpoint path to introduce entropy. e.g. I may want to maintain the top level prefixes (checkpoints and savepoints) {code:java} s3://bucket/checkpoints//rest-of-checkpoint-path s3://bucket/savepoints//rest-of-savepointpoint-path{code} > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420885#comment-16420885 ] Jamie Grier edited comment on FLINK-9061 at 3/31/18 1:32 AM: - Maybe we should keep this super simple and make a change at the state backend level where we optionally just reverse the key name. That should actually work very well. was (Author: jgrier): Maybe we should keep this super simple and make a change at the state backend level where we optionally just reverse the path. That should actually work very well. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417145#comment-16417145 ] Steve Loughran edited comment on FLINK-9061 at 3/28/18 10:33 AM: - The s3a connector will have the same issue, though there we can change the backoff policy, so you shouldn't see it other than an increase in the throttle event metrics. But checkpointing will slow down Putting the most randomness at the head of the path is the one that s3 likes best, as its partitioning is based on the first few characters. And I mean "really at the head of the path" : was (Author: ste...@apache.org): The s3a connector will have the same issue, though there we can change the backoff policy, so you shouldn't see it other than an increase in the throttle event metrics. But checkpointing will slow down Putting the most randomness at the head of the path is the one that s3 likes best, as its partitioning is based on the first few characters. And I mean "really at the head of the path" : https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416254#comment-16416254 ] Jamie Grier edited comment on FLINK-9061 at 3/27/18 9:13 PM: - Yeah, so I completely agree that the response should be a 503 or better yet a 429 but it's not. I already ran this through the AWS support channels. The response was essentially that this was "internally" a TooBusyException. Here's their full response: {quote}Based on the information provided, I understand that you are experiencing some internal errors (status code 500) from S3, which is impacting one of your Flink jobs. From the log dive on your provided request IDs, I observe that your PutObject request triggered Internal Error with TooBusyException. This happens when a bucket receives more requests than it can handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests per second or more than 300 GET requests per second. So, if your workload is to exceed this limit, you'd need to scale your bucket through partitioning. Currently, your key space isn't randomized and all your keys include "BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". Therefore, your bucket isn't being automatically partitioned by S3 and you received increased error rates after your requests increased. {quote} was (Author: jgrier): Yeah, so I completely agree that should be a 503 but it's not. I already ran this through the AWS channels. The response was essentially that this was "internally" a TooBusyException. Here's their full response: {quote}Based on the information provided, I understand that you are experiencing some internal errors (status code 500) from S3, which is impacting one of your Flink jobs. From the log dive on your provided request IDs, I observe that your PutObject request triggered Internal Error with TooBusyException. This happens when a bucket receives more requests than it can handle or is allowed [1]. By default, S3 limits 100 PUT/LIST/DELETE requests per second or more than 300 GET requests per second. So, if your workload is to exceed this limit, you'd need to scale your bucket through partitioning. Currently, your key space isn't randomized and all your keys include "BUCKET/SERVICE/flink/checkpoints/faa473252e9bf42d07f618923fa22af1/chk-13/". Therefore, your bucket isn't being automatically partitioned by S3 and you received increased error rates after your requests increased. {quote} > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414828#comment-16414828 ] Jamie Grier edited comment on FLINK-9061 at 3/27/18 12:45 AM: -- [~ste...@apache.org] Here's the full stack trace: {quote}"java.lang.Exception: Could not perform checkpoint 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569) org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560) ... 8 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b in order to obtain the stream state handle org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113) org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359) ... 13 common frames omitted Caused by: java.io.IOException: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended Request ID: BLAH_ org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036) org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319) ... 18 common frames omitted Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:49 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash/random prefix solution can work for both scenarios. but the hash need to be applied to the complete path (including the last random UUID part). But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml). But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414696#comment-16414696 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:46 PM: - It seems that our internal change *only* works with internal checkpoint. It doesn't work for external checkpoint and savepoint. A more general discussion may be needed on how to make it work for external checkpoint and savepoint. I know Flink community is also thinking about merging internal/external checkpoint and savepoint. Here are some initial thoughts for external checkpoint and savepoint. * uber metadata file (written by jobmanager) keep the same base path (without random/hash entropy). this is needed to help with implementing retention policy by prefix query (s3 list). this is not an issue for internal checkpoint, because zookeeper gives us such root reference even with entropy substitution. * actual data files written by operators from task manager needs to have entropy in key name to spread the writes to different S3 partitions. was (Author: stevenz3wu): It seems that our internal change *only* works with internal checkpoint. It doesn't work for external checkpoint and savepoint. I know Flink community is also thinking about merging internal/external checkpoint and savepoint. A more general discussion may be needed. Ideally, what I would like to see for external checkpoint and savepoint. * uber metadata file (written by jobmanager) keep the same base path (without random/hash entropy). this is needed to help with implementing retention policy by prefix query (s3 list). this is not an issue for internal checkpoint, because zookeeper gives us such root reference even with entropy substitution. * actual data files written by operators from task manager needs to have entropy in key name to spread the writes to different S3 partitions. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:39 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml). But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml). For the hash approach, maybe it should hash(base_path, operator_id)? But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:38 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml). For the hash approach, maybe it should hash(base_path, operator_id)? But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:34 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with __ENTROPY___KEY__ substituted with a 4-char random hex. Our change doesn't work for external checkpoint and savepoint. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:28 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with __ENTROPY___KEY__ substituted with a 4-char random hex. Our change doesn't work for external checkpoint and savepoint. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:24 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:21 PM: - [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:31 PM: [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml) But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:30 PM: [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM: [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of _ENTROPY___KEY_ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of _ENTROPY__KEY_ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM: [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of _ENTROPY___KEY_ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414085#comment-16414085 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 4:27 PM: [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of _ENTROPY__KEY_ I can see that hash approach can work for both scenario. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. I can see that _hash_ approach can work for both scenario. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. * single large-state job with TBs state and thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never take a savepoint for case #2 (single large-state job). > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628 ] Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:43 AM: -- That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment {{_ hash _}} this would be replaced by such a hash. was (Author: stephanewen): That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment `_hash_` this would be replaced by such a hash. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628 ] Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:42 AM: -- That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment `_hash_` this would be replaced by such a hash. was (Author: stephanewen): That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment {{_hash_}} this would be replaced by such a hash. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413628#comment-16413628 ] Stephan Ewen edited comment on FLINK-9061 at 3/26/18 9:41 AM: -- That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment {{_hash_}} this would be replaced by such a hash. was (Author: stephanewen): That would be a great contribution, valuable for many S3 users. Indeed, S3's partitioning by prefix FYI: The 1.5 release will have a few changes to the state backends that will reduce some S3 requests (mainly HEAD, if you use s3a). The here proposed is still needed, though. We could approach this in two ways: 1. Make the change to the state backend itself, to allow it to insert entropy into the paths. We can think about only doing this for checkpoints, but not for savepoints, because then savepoints would still be together in "one directory" (under the same prefix). 2. Make this a general change in the S3 filesystem, to use a deterministic path hash like Jamie suggested. That would help to solve this across other use cases of S3 as well. Whenever the path contains the segment *_hash_* this would be replaced by such a hash. > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 3:06 AM: Jamie, yes, we run into the same issue at Netflix. We did exactly like what you said. Here are our config looks like. state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints state.backend.fs.checkpointdir.injectEntropy.enabled: true state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__ we modified state backend code to support it. Without the random chars in the path, there seems no way for S3 to partition the bucket to support high request rate. I don't see other way around it. There is obviously a down side with such random chars in s3 path. now you can't do prefix listing anymore. Steven was (Author: stevenz3wu): Jamie, yes, we run into the same issue at Netflix. We did exactly like what you said. Here are our config looks like. state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints state.backend.fs.checkpointdir.injectEntropy.enabled: true state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__ we modified state backend code to support it. Without the random chars in the path, there is no way for S3 to partition the bucket to support high request rate. I don't see other way around it. There is obviously down side with such random chars in s3 path. now you can't do prefix listing anymore. Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326 ] Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 1:02 AM: Jamie, yes, we run into the same issue at Netflix. We did exactly like what you said. Here are our config looks like. state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints state.backend.fs.checkpointdir.injectEntropy.enabled: true state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__ we modified state backend code to support it. Without the random chars in the path, there is no way for S3 to partition the bucket to support high request rate. I don't see other way around it. There is obviously down side with such random chars in s3 path. now you can't do prefix listing anymore. Steven was (Author: stevenz3wu): Jamie, yes, we run into the same issue at Netflix. We did exactly like what you said. Here are our config looks like. state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints state.backend.fs.checkpointdir.injectEntropy.enabled: true state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__ we modified state backend code to support it. Without the random chars in the path, there is no way for S3 to partition the bucket to support high request rate. I don't see other way around it. There is obviously down side with such random chars in s3 path. now you can't do prefix listing anymore. Steven > S3 checkpoint data not partitioned well -- causes errors and poor performance > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Jamie Grier >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)