[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r202574665
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r202556032
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201374783
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalResumableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends 
AbstractResumableWriterTest {
+
+   @ClassRule
+   public static TemporaryFolder tempFolder = new TemporaryFolder();
--- End diff --

It is good practice to make these final


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201400623
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+   private static final Random RND = new Random();
+
+   private static final String testData1 = "THIS IS A TEST 1.";
+   private static final String testData2 = "THIS IS A TEST 2.";
+   private static final String testData3 = "THIS IS A TEST 3.";
+
+   private Path basePathForTest;
+
+   private static FileSystem fileSystem;
+
+   public abstract Path getBasePath() throws Exception;
+
+   public abstract FileSystem initializeFileSystem();
+
+   public Path getBasePathForTest() {
+   return basePathForTest;
+   }
+
+   private FileSystem getFileSystem() {
+   if (fileSystem == null) {
+   fileSystem = initializeFileSystem();
+   }
+   return fileSystem;
+   }
+
+   private ResumableWriter getNewFileSystemWriter() throws IOException {
+   return getFileSystem().createRecoverableWriter();
+   }
+
+   @Before
+   public void prepare() throws Exception {
+   basePathForTest = new Path(getBasePath(), randomName());
+   getFileSystem().mkdirs(basePathForTest);
+   }
+
+   @After
+   public void cleanup() throws Exception {
+   getFileSystem().delete(basePathForTest, true);
+   }
+
+   @Test
+   public void testCloseWithNoData() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir + File.separator + "part-0");
--- End diff --

Avoid `File.separator` for cross platform path, use `new Path(testDir, 
"part-0");`.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201369555
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
return recoverable;
}
}
+
+   /**
+* Called when resuming execution after a failure and waits until the 
lease
+* of the file we are resuming is free.
+*
+* The lease of the file we are resuming writing/committing to may 
still
+* belong to the process that failed previously and whose state we are
+* recovering.
+*
+* @param path The path to the file we want to resume writing to.
+*/
+   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
+   Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+   final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+   dfs.recoverLease(path);
+   boolean isclosed = dfs.isFileClosed(path);
+
+   final StopWatch sw = new StopWatch();
--- End diff --

Let's use `Deadline` from the Flink utils instead to reduce external 
dependencies.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201374633
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
return recoverable;
}
}
+
+   /**
+* Called when resuming execution after a failure and waits until the 
lease
+* of the file we are resuming is free.
+*
+* The lease of the file we are resuming writing/committing to may 
still
+* belong to the process that failed previously and whose state we are
+* recovering.
+*
+* @param path The path to the file we want to resume writing to.
+*/
+   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
+   Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+   final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+   dfs.recoverLease(path);
+   boolean isclosed = dfs.isFileClosed(path);
+
+   final StopWatch sw = new StopWatch();
+   sw.start();
+
+   while (!isclosed) {
+   if (sw.getTime() > LEASE_TIMEOUT) {
+   break;
+   }
+
+   try {
--- End diff --

This basically locks the thread in for up to LEASE_TIMEOUT time, making it 
not possible to cancel. I would either propagate the InterruptedException, or 
rethrow it as an IOException indicating that recovering the lease failed 
(because this is a single-purpose util function that works here).


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201375290
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 ---
@@ -130,7 +130,7 @@ public static boolean hasHDFSDelegationToken() throws 
Exception {
 */
public static boolean isMinHadoopVersion(int major, int minor) throws 
FlinkRuntimeException {
String versionString = VersionInfo.getVersion();
-   String[] versionParts = versionString.split(".");
+   String[] versionParts = versionString.split("\\.");
--- End diff --

Good catch!


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201401033
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+   private static final Random RND = new Random();
+
+   private static final String testData1 = "THIS IS A TEST 1.";
+   private static final String testData2 = "THIS IS A TEST 2.";
+   private static final String testData3 = "THIS IS A TEST 3.";
+
+   private Path basePathForTest;
+
+   private static FileSystem fileSystem;
+
+   public abstract Path getBasePath() throws Exception;
+
+   public abstract FileSystem initializeFileSystem();
+
+   public Path getBasePathForTest() {
+   return basePathForTest;
+   }
+
+   private FileSystem getFileSystem() {
+   if (fileSystem == null) {
+   fileSystem = initializeFileSystem();
+   }
+   return fileSystem;
+   }
+
+   private ResumableWriter getNewFileSystemWriter() throws IOException {
+   return getFileSystem().createRecoverableWriter();
+   }
+
+   @Before
+   public void prepare() throws Exception {
+   basePathForTest = new Path(getBasePath(), randomName());
+   getFileSystem().mkdirs(basePathForTest);
+   }
+
+   @After
+   public void cleanup() throws Exception {
+   getFileSystem().delete(basePathForTest, true);
+   }
+
+   @Test
+   public void testCloseWithNoData() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir + File.separator + "part-0");
+
+   final RecoverableFsDataOutputStream stream = writer.open(path);
+   for (Map.Entry fileContents : 
getFileContentByPath(testDir).entrySet()) {
+   
Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+   Assert.assertTrue(fileContents.getValue().isEmpty());
+   }
+
+   stream.closeForCommit().commit();
+
+   for (Map.Entry fileContents : 
getFileContentByPath(testDir).entrySet()) {
+   Assert.assertEquals("part-0", 
fileContents.getKey().getName());
+   Assert.assertTrue(fileContents.getValue().isEmpty());
+   }
+   }
+
+   @Test
+   public void testCommitAfterNormalClose() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir.getPath() + File.separator + 
"part-0");
+
+   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   
stream.write(testData1.getBytes(Charset.forName("UTF-8")));
--- End diff --

Use `StandardCharsets.UTF_8` instead of "UTF-8".


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201375170
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -41,6 +44,8 @@
 @Internal
 class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream {
 
+   private static final long LEASE_TIMEOUT = 10L;
--- End diff --

Can we add digit grouping chars here? Makes it easier to read...


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201342618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201059374
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
+   }
+   }
+
+   this.pending = new ArrayList<>();
+   this.pendingPerCheckpoint = new HashMap<>();
+   }
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201059444
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
--- End diff --

You are right!


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201058746
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
+   }
+   }
+
+   this.pending = new ArrayList<>();
+   this.pendingPerCheckpoint = new HashMap<>();
+   }
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201058329
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
--- End diff --

Should this use `commitAfterRecovery()`?


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201042618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201042180
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link 

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r200909011
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Javadoc.
--- End diff --

Thanks for the catch! I will update.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-08 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r200835177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Javadoc.
--- End diff --

wrong Java doc


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-07 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/6281

[FLINK-9750] Add new StreamingFileSink with ResumableWriter.

## What is the purpose of the change

This PR is the first step towards introducing a new Streaming Filesystem 
sink that works on top of Flink's filesystem abstraction and provides support 
for both row and block-based formats (like ORC/Parquet).

The current version only supports the LocalFileSystem.

## Brief change log

The first commit introduces the new `ResumableWriter` abstraction and an 
implementation for the `LocalFileSystem`, while the second introduces the new 
`StreamingFileSink`. 

## Verifying this change

This changes add tests in the `LocalStreamingFileSinkTest` and  the 
`BucketStateSerializerTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented yet

**NOTE TO REVIEWER**: Still logging is missing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink bucketing-local-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6281


commit 64cdd7b57f6f71db3fa8fe90d9eb37e784b9ae56
Author: Stephan Ewen 
Date:   2018-06-29T17:27:58Z

[hotfix] [core] Remove unused class AbstractMultiFSDataInputStream

commit 2a7ed070dd7b57a58a7588b7ce03c4032a3283fc
Author: Stephan Ewen 
Date:   2018-06-29T17:15:54Z

[FLINK-9751][filesystem] Add PersistentResumableWriter interface.

commit da64fedced17cef7d53448dac360a26ae9d32204
Author: kkloudas 
Date:   2018-07-06T14:38:08Z

[FLINK-9750] Add new StreamingFileSink on top of the ResumableWriter.

This commit introduces the new sink and tests it on the LocalFileSystem.




---