[FLINK-3242] Also Set User-specified StateBackend without Checkpointing

Before, the user-specified StateBackedn would not be set when generating the
JobGraph if checkpointing was disabled.

This closes #1516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c

Branch: refs/heads/master
Commit: 83b88c2c606f0d36bc04a7250629eb00516af919
Parents: f6d2ce9
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Jan 18 11:53:31 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 14:30:28 2016 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../runtime/state/StateBackendITCase.java       | 134 +++++++++++++++++++
 2 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 50c6a15..56b16a4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator {
 
                final CheckpointConfig ceckpointCfg = 
streamGraph.getCheckpointConfig();
                
+               config.setStateBackend(streamGraph.getStateBackend());
                
config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
                if (ceckpointCfg.isCheckpointingEnabled()) {
                        
config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
-                       config.setStateBackend(streamGraph.getStateBackend());
                }
                else {
                        // the "at-least-once" input handler is slightly 
cheaper (in the absence of checkpoints),

http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
new file mode 100644
index 0000000..cdfef85
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.state;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+       /**
+        * Verify that the user-specified state backend is used even if 
checkpointing is disabled.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testStateBackendWithoutCheckpointing() throws Exception {
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+
+               see.setNumberOfExecutionRetries(0);
+               see.setStateBackend(new FailingStateBackend());
+
+
+               see.fromElements(new Tuple2<>("Hello", 1))
+                       .keyBy(0)
+                       .map(new RichMapFunction<Tuple2<String,Integer>, 
String>() {
+                               private static final long serialVersionUID = 1L;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       
getRuntimeContext().getKeyValueState("test", String.class, "");
+                               }
+
+                               @Override
+                               public String map(Tuple2<String, Integer> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       })
+                       .print();
+
+               boolean caughtSuccess = false;
+               try {
+                       see.execute();
+               } catch (JobExecutionException e) {
+                       if (e.getCause() instanceof SuccessException) {
+                               caughtSuccess = true;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertTrue(caughtSuccess);
+       }
+
+
+       public static class FailingStateBackend extends 
StateBackend<FailingStateBackend> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void initializeForJob(Environment env) throws Exception {
+                       throw new SuccessException();
+               }
+
+               @Override
+               public void disposeAllStateForCurrentJob() throws Exception {
+
+               }
+
+               @Override
+               public void close() throws Exception {
+
+               }
+
+               @Override
+               public <K, V> KvState<K, V, FailingStateBackend> 
createKvState(String stateId,
+                       String stateName,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<V> valueSerializer,
+                       V defaultValue) throws Exception {
+                       return null;
+               }
+
+               @Override
+               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID,
+                       long timestamp) throws Exception {
+                       return null;
+               }
+
+               @Override
+               public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(S state,
+                       long checkpointID,
+                       long timestamp) throws Exception {
+                       return null;
+               }
+       }
+
+       static final class SuccessException extends Exception {
+               private static final long serialVersionUID = 
-9218191172606739598L;
+       }
+
+}

Reply via email to