azagrebin commented on a change in pull request #7047: [FLINK-10522] Check if 
RecoverableWriter supportsResume() and act accordingly.
URL: https://github.com/apache/flink/pull/7047#discussion_r238593584
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 ##########
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tests for the {@link Bucket} class.
+ */
+public class BucketTest {
+
+       private final PartFileWriter.PartFileFactory<String, Integer> factory =
+                       new RowWisePartWriter.Factory<>(new 
SimpleStringEncoder<>());
+
+       private final RollingPolicy<String, Integer> rollingPolicy = 
DefaultRollingPolicy.create().build();
+
+       // --------------------------- Checking Restore 
---------------------------
+
+       @Test
+       public void 
inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws 
IOException {
+               final StubNonResumableWriter nonResumableWriter = new 
StubNonResumableWriter();
+               final Bucket<String, Integer> bucket = 
getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
+
+               Assert.assertThat(nonResumableWriter, 
hasMethodCallCountersEqualTo(1, 0, 1));
+               Assert.assertThat(bucket, hasNullInProgressFile(true));
+       }
+
+       @Test
+       public void inProgressFileShouldBeRestoredIfWriterSupportsResume() 
throws IOException {
+               final StubResumableWriter resumableWriter = new 
StubResumableWriter();
+               final Bucket<String, Integer> bucket = 
getRestoredBucketWithOnlyInProgressPart(resumableWriter);
+
+               Assert.assertThat(resumableWriter, 
hasMethodCallCountersEqualTo(1, 1, 0));
+               Assert.assertThat(bucket, hasNullInProgressFile(false));
+       }
+
+       @Test
+       public void pendingFilesShouldBeRestored() throws IOException {
+               final int expectedRecoverForCommitCounter = 10;
+
+               final StubNonResumableWriter writer = new 
StubNonResumableWriter();
+               final Bucket<String, Integer> bucket = 
getRestoredBucketWithOnlyPendingParts(writer, expectedRecoverForCommitCounter);
+
+               Assert.assertThat(writer, hasMethodCallCountersEqualTo(0, 0, 
expectedRecoverForCommitCounter));
+               Assert.assertThat(bucket, hasNullInProgressFile(true));
+       }
+
+       // ---------------------------------- Matchers 
----------------------------------
+
+       private static TypeSafeMatcher<Bucket<String, Integer>> 
hasNullInProgressFile(final boolean isNull) {
+
+               return new TypeSafeMatcher<Bucket<String, Integer>>() {
+                       @Override
+                       protected boolean matchesSafely(Bucket<String, Integer> 
bucket) {
+                               final PartFileWriter<String, Integer> 
inProgressPart = bucket.getInProgressPart();
+                               return isNull == (inProgressPart == null);
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("a Bucket with its 
inProgressPart being ")
+                                               .appendText(isNull ? " null." : 
" not null.");
+                       }
+               };
+       }
+
+       private static TypeSafeMatcher<BaseStubWriter> 
hasMethodCallCountersEqualTo(
+                       final int supportsResumeCalls,
+                       final int recoverCalls,
+                       final int recoverForCommitCalls) {
+
+               return new TypeSafeMatcher<BaseStubWriter>() {
+                       @Override
+                       protected boolean matchesSafely(BaseStubWriter writer) {
+                               return writer.getSupportsResumeCallCounter() == 
supportsResumeCalls &&
+                                               writer.getRecoverCallCounter() 
== recoverCalls &&
+                                               
writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("a Writer where:")
+                                               .appendText(" supportsResume 
was called ").appendValue(supportsResumeCalls).appendText(" times,")
+                                               .appendText(" recover was 
called ").appendValue(recoverCalls).appendText(" times,")
+                                               .appendText(" and 
recoverForCommit was called ").appendValue(recoverForCommitCalls).appendText(" 
times.")
+                                               .appendText("'");
+                       }
+               };
+       }
+
+       // ---------------------------------- Utility Methods 
----------------------------------
+
+       private Bucket<String, Integer> 
getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws 
IOException {
+               final BucketState<Integer> stateWithOnlyInProgressFile =
+                               new BucketState<>(5, new Path(), 12345L, new 
NoOpRecoverable(), new HashMap<>());
+               return Bucket.restore(writer, 0, 1L, factory, rollingPolicy, 
stateWithOnlyInProgressFile);
+       }
+
+       private Bucket<String, Integer> 
getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int 
numberOfPendingParts) throws IOException {
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
completePartsPerCheckpoint =
+                               
createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+               final BucketState<Integer> initStateWithOnlyInProgressFile =
+                               new BucketState<>(5, new Path(), 12345L, null, 
completePartsPerCheckpoint);
+               return Bucket.restore(writer, 0, 1L, factory, rollingPolicy, 
initStateWithOnlyInProgressFile);
+       }
+
+       private Map<Long, List<RecoverableWriter.CommitRecoverable>> 
createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingCommittablesPerCheckpoint = new HashMap<>();
+               for (int checkpointId = 0; checkpointId < noOfCheckpoints; 
checkpointId++) {
+                       final List<RecoverableWriter.CommitRecoverable> pending 
= new ArrayList<>();
+                       pending.add(new NoOpRecoverable());
+                       pendingCommittablesPerCheckpoint.put((long) 
checkpointId, pending);
+               }
+               return pendingCommittablesPerCheckpoint;
+       }
+
+       // ---------------------------------- Test Classes 
----------------------------------
+
+       /**
+        * A test implementation of a {@link RecoverableWriter} that does not 
support
+        * resuming, i.e. keep on writing to the in-progress file at the point 
we were
+        * before the failure.
 
 Review comment:
   Comments for 3 stub classes look like they need to be updated accordingly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to