[jira] [Created] (FLINK-18038) StateBackendLoader logs application-defined state before it is fully configured

2020-05-29 Thread Steve Bairos (Jira)
Steve Bairos created FLINK-18038:


 Summary: StateBackendLoader logs application-defined state before 
it is fully configured
 Key: FLINK-18038
 URL: https://issues.apache.org/jira/browse/FLINK-18038
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.1
Reporter: Steve Bairos


In the 
[StateBackendLoader|[https://github.com/apache/flink/blob/bb46756b84940a6134910e74406bfaff4f2f37e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L201]],
 there's this log line:
{code:java}
logger.info("Using application-defined state backend: {}", fromApplication); 
{code}
It seems like this is inaccurate though because immediately after logging this, 
if fromApplication is a ConfigurableStateBackend, we call the .configure() 
function and it is replaced by a newly configured StateBackend. 

To me, it seems like it would be better if we logged the state backend after it 
was fully configured. In the current setup, we get confusing logs like this: 
{code:java}
2020-05-29 21:39:44,387 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Using 
application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
's3://pinterest-montreal/checkpoints/xenon-dev-001-20191210/Xenon/BasicJavaStream',
 savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), 
localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED, 
numberOfTransferingThreads=-1}2020-05-29 21:39:44,387 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Configuring 
application-defined state backend with job/cluster config{code}
Which makes it ambiguous whether or not settings in our flink-conf.yaml like 
"state.backend.incremental: true" are being applied properly or not. 

 

I can make a diff for the change if there aren't any objections



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17583) Allow option to store a savepoint's _metadata file separate from its data files

2020-05-08 Thread Steve Bairos (Jira)
Steve Bairos created FLINK-17583:


 Summary: Allow option to store a savepoint's _metadata file 
separate from its data files
 Key: FLINK-17583
 URL: https://issues.apache.org/jira/browse/FLINK-17583
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: Steve Bairos


(In the description I mainly talk about savepoints, but the plan )

We have a deployment framework that often needs to be able to return a list of 
valid savepoints in S3 with a certain prefix. Our assertion is that if an S3 
object ends with '_metadata', then it is a valid savepoint. In order to 
generate the list of valid savepoints, we need to locate all of the _metadata 
files that start with a given prefix.

For example, if our S3 bucket's paths look like this:

 
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed
... thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/41297fd5-40df-4683-bfb6-534bfddae92a
... thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/2d2f5551-56a7-4fea-b25b-b0156660c650
 thousands of other savepoint data files
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/c8c410df-5fb0-46a0-84c5-43e1575e8dc5
... dozens of other savepoint dirs


{code}
 

In order to get a list of all savepoints that my-job1 could possibly start 
with, we would want to get all the savepoints that start with the prefix:
{code:java}
s3://bucket/savepoints/my-job1 {code}
Ideally, we would want to have the ability to get a list like this from S3:
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/_metadata
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/_metadata
s3://bucket/savepoints/my-job1/2020-04-02/savepoint-987654-1100110011/_metadata{code}
Unfortunately there is no easy way to get this value because S3's API only 
allows you to search based on prefix and not postfix. Listing all objects with 
the prefix 's3://bucket/savepoints/my-job1' and then filtering the list to only 
include the files that contain _metadata will also not work because there are 
thousands of savepoint data files that have the same prefix such as:
{code:java}
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c165546-c326-43c0-9f47-f9a2cfd000ed
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-1a2b3c4d5e/9c757e5b-92b7-47b8-bfe8-cfe70eb28702
s3://bucket/savepoints/my-job1/2020-04-01/savepoint-123456-99/acbe839a-1ec7-4b41-9d87-595d557c2ac6
etc.{code}
 

I propose that we add a configuration in a similar vein to the S3 entropy 
injector which allows us to store the _metadata file in a separate path from 
the savepoint's data files. For example, with this hypothetical configuration:
{code:java}
state.checkpoints.split.key: _datasplit_
state.checkpoints.split.metadata.dir: metadata
state.checkpoints.split.data.dir: data{code}
When a user triggers a savepoint with the path
{code:java}
s3://bucket/savepoints/_datasplit_/my-job1/2020-05-07/ {code}
The resulting savepoint that is created looks like:
{code:java}
s3://bucket/savepoints/metadata/my-job1/2020-05-07/savepoint-654321-abcdef9876/_metadata
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/a50fc483-3581-4b55-a37e-b7c61b3ee47f
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/b0c6b7c0-6b94-43ae-8678-2f7640af1523
s3://bucket/savepoints/data/my-job1/2020-05-07/savepoint-654321-abcdef9876/c1855b35-c0b7-4347-9352-88423998e5ec{code}
Notice that the metadata's prefix is 
{code:java}
 s3://bucket/savepoints/metadata/my-job1/2020-05-07/{code}
and the data files' prefix is
{code:java}
 s3://bucket/savepoints/data/my-job1/2020-05-07/{code}
That way if I want to list all the savepoints for my-job1, I can just list all 
the objects with the prefix 
{code:java}
 aws s3 ls --recursive s3://bucket/savepoints/metadata/my-job1/{code}
And I can get a clean list of just the _metadata files easily.

 

One alternative that we've thought about is using is the entropy injection. It 
technically does separate the _metadata file from the rest of the data as well 
but it kind of makes a mess of entropy dirs in S3 so it's not our ideal choice. 

 

I'm happy to take a shot at