[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r202574665 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and + * this can be configured using {@link
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r202556032 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and + * this can be configured using {@link
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201374783 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.local; + +import org.apache.flink.core.fs.AbstractResumableWriterTest; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for the {@link LocalResumableWriter}. + */ +public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest { + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); --- End diff -- It is good practice to make these final ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201400623 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.fail; + +public abstract class AbstractResumableWriterTest extends TestLogger { + + private static final Random RND = new Random(); + + private static final String testData1 = "THIS IS A TEST 1."; + private static final String testData2 = "THIS IS A TEST 2."; + private static final String testData3 = "THIS IS A TEST 3."; + + private Path basePathForTest; + + private static FileSystem fileSystem; + + public abstract Path getBasePath() throws Exception; + + public abstract FileSystem initializeFileSystem(); + + public Path getBasePathForTest() { + return basePathForTest; + } + + private FileSystem getFileSystem() { + if (fileSystem == null) { + fileSystem = initializeFileSystem(); + } + return fileSystem; + } + + private ResumableWriter getNewFileSystemWriter() throws IOException { + return getFileSystem().createRecoverableWriter(); + } + + @Before + public void prepare() throws Exception { + basePathForTest = new Path(getBasePath(), randomName()); + getFileSystem().mkdirs(basePathForTest); + } + + @After + public void cleanup() throws Exception { + getFileSystem().delete(basePathForTest, true); + } + + @Test + public void testCloseWithNoData() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir + File.separator + "part-0"); --- End diff -- Avoid `File.separator` for cross platform path, use `new Path(testDir, "part-0");`. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201369555 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() { return recoverable; } } + + /** +* Called when resuming execution after a failure and waits until the lease +* of the file we are resuming is free. +* +* The lease of the file we are resuming writing/committing to may still +* belong to the process that failed previously and whose state we are +* recovering. +* +* @param path The path to the file we want to resume writing to. +*/ + private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException { + Preconditions.checkState(fs instanceof DistributedFileSystem); + + final DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.recoverLease(path); + boolean isclosed = dfs.isFileClosed(path); + + final StopWatch sw = new StopWatch(); --- End diff -- Let's use `Deadline` from the Flink utils instead to reduce external dependencies. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201374633 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() { return recoverable; } } + + /** +* Called when resuming execution after a failure and waits until the lease +* of the file we are resuming is free. +* +* The lease of the file we are resuming writing/committing to may still +* belong to the process that failed previously and whose state we are +* recovering. +* +* @param path The path to the file we want to resume writing to. +*/ + private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException { + Preconditions.checkState(fs instanceof DistributedFileSystem); + + final DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.recoverLease(path); + boolean isclosed = dfs.isFileClosed(path); + + final StopWatch sw = new StopWatch(); + sw.start(); + + while (!isclosed) { + if (sw.getTime() > LEASE_TIMEOUT) { + break; + } + + try { --- End diff -- This basically locks the thread in for up to LEASE_TIMEOUT time, making it not possible to cancel. I would either propagate the InterruptedException, or rethrow it as an IOException indicating that recovering the lease failed (because this is a single-purpose util function that works here). ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201375290 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -130,7 +130,7 @@ public static boolean hasHDFSDelegationToken() throws Exception { */ public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException { String versionString = VersionInfo.getVersion(); - String[] versionParts = versionString.split("."); + String[] versionParts = versionString.split("\\."); --- End diff -- Good catch! ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201401033 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.fail; + +public abstract class AbstractResumableWriterTest extends TestLogger { + + private static final Random RND = new Random(); + + private static final String testData1 = "THIS IS A TEST 1."; + private static final String testData2 = "THIS IS A TEST 2."; + private static final String testData3 = "THIS IS A TEST 3."; + + private Path basePathForTest; + + private static FileSystem fileSystem; + + public abstract Path getBasePath() throws Exception; + + public abstract FileSystem initializeFileSystem(); + + public Path getBasePathForTest() { + return basePathForTest; + } + + private FileSystem getFileSystem() { + if (fileSystem == null) { + fileSystem = initializeFileSystem(); + } + return fileSystem; + } + + private ResumableWriter getNewFileSystemWriter() throws IOException { + return getFileSystem().createRecoverableWriter(); + } + + @Before + public void prepare() throws Exception { + basePathForTest = new Path(getBasePath(), randomName()); + getFileSystem().mkdirs(basePathForTest); + } + + @After + public void cleanup() throws Exception { + getFileSystem().delete(basePathForTest, true); + } + + @Test + public void testCloseWithNoData() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir + File.separator + "part-0"); + + final RecoverableFsDataOutputStream stream = writer.open(path); + for (Map.Entry fileContents : getFileContentByPath(testDir).entrySet()) { + Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress.")); + Assert.assertTrue(fileContents.getValue().isEmpty()); + } + + stream.closeForCommit().commit(); + + for (Map.Entry fileContents : getFileContentByPath(testDir).entrySet()) { + Assert.assertEquals("part-0", fileContents.getKey().getName()); + Assert.assertTrue(fileContents.getValue().isEmpty()); + } + } + + @Test + public void testCommitAfterNormalClose() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir.getPath() + File.separator + "part-0"); + + try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + stream.write(testData1.getBytes(Charset.forName("UTF-8"))); --- End diff -- Use `StandardCharsets.UTF_8` instead of "UTF-8". ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201375170 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -41,6 +44,8 @@ @Internal class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { + private static final long LEASE_TIMEOUT = 10L; --- End diff -- Can we add digit grouping chars here? Makes it easier to read... ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201342618 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and + * this can be configured using {@link
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201059374 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); + } + } + + this.pending = new ArrayList<>(); + this.pendingPerCheckpoint = new HashMap<>(); + } + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201059444 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); --- End diff -- You are right! ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201058746 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); + } + } + + this.pending = new ArrayList<>(); + this.pendingPerCheckpoint = new HashMap<>(); + } + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201058329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); --- End diff -- Should this use `commitAfterRecovery()`? ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201042618 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and + * this can be configured using {@link
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201042180 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and + * this can be configured using {@link
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r200909011 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Javadoc. --- End diff -- Thanks for the catch! I will update. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r200835177 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Javadoc. --- End diff -- wrong Java doc ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/6281 [FLINK-9750] Add new StreamingFileSink with ResumableWriter. ## What is the purpose of the change This PR is the first step towards introducing a new Streaming Filesystem sink that works on top of Flink's filesystem abstraction and provides support for both row and block-based formats (like ORC/Parquet). The current version only supports the LocalFileSystem. ## Brief change log The first commit introduces the new `ResumableWriter` abstraction and an implementation for the `LocalFileSystem`, while the second introduces the new `StreamingFileSink`. ## Verifying this change This changes add tests in the `LocalStreamingFileSinkTest` and the `BucketStateSerializerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented yet **NOTE TO REVIEWER**: Still logging is missing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink bucketing-local-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6281 commit 64cdd7b57f6f71db3fa8fe90d9eb37e784b9ae56 Author: Stephan Ewen Date: 2018-06-29T17:27:58Z [hotfix] [core] Remove unused class AbstractMultiFSDataInputStream commit 2a7ed070dd7b57a58a7588b7ce03c4032a3283fc Author: Stephan Ewen Date: 2018-06-29T17:15:54Z [FLINK-9751][filesystem] Add PersistentResumableWriter interface. commit da64fedced17cef7d53448dac360a26ae9d32204 Author: kkloudas Date: 2018-07-06T14:38:08Z [FLINK-9750] Add new StreamingFileSink on top of the ResumableWriter. This commit introduces the new sink and tests it on the LocalFileSystem. ---