[FLINK-3242] Also Set User-specified StateBackend without Checkpointing Before, the user-specified StateBackedn would not be set when generating the JobGraph if checkpointing was disabled.
This closes #1516 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c Branch: refs/heads/master Commit: 83b88c2c606f0d36bc04a7250629eb00516af919 Parents: f6d2ce9 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Jan 18 11:53:31 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Jan 28 14:30:28 2016 +0100 ---------------------------------------------------------------------- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../runtime/state/StateBackendITCase.java | 134 +++++++++++++++++++ 2 files changed, 135 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 50c6a15..56b16a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator { final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig(); + config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled()); if (ceckpointCfg.isCheckpointingEnabled()) { config.setCheckpointMode(ceckpointCfg.getCheckpointingMode()); - config.setStateBackend(streamGraph.getStateBackend()); } else { // the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints), http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java new file mode 100644 index 0000000..cdfef85 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.state; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertTrue; + + +public class StateBackendITCase extends StreamingMultipleProgramsTestBase { + + /** + * Verify that the user-specified state backend is used even if checkpointing is disabled. + * + * @throws Exception + */ + @Test + public void testStateBackendWithoutCheckpointing() throws Exception { + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + see.setNumberOfExecutionRetries(0); + see.setStateBackend(new FailingStateBackend()); + + + see.fromElements(new Tuple2<>("Hello", 1)) + .keyBy(0) + .map(new RichMapFunction<Tuple2<String,Integer>, String>() { + private static final long serialVersionUID = 1L; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + getRuntimeContext().getKeyValueState("test", String.class, ""); + } + + @Override + public String map(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + }) + .print(); + + boolean caughtSuccess = false; + try { + see.execute(); + } catch (JobExecutionException e) { + if (e.getCause() instanceof SuccessException) { + caughtSuccess = true; + } else { + throw e; + } + } + + assertTrue(caughtSuccess); + } + + + public static class FailingStateBackend extends StateBackend<FailingStateBackend> { + private static final long serialVersionUID = 1L; + + @Override + public void initializeForJob(Environment env) throws Exception { + throw new SuccessException(); + } + + @Override + public void disposeAllStateForCurrentJob() throws Exception { + + } + + @Override + public void close() throws Exception { + + } + + @Override + public <K, V> KvState<K, V, FailingStateBackend> createKvState(String stateId, + String stateName, + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + V defaultValue) throws Exception { + return null; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, + long timestamp) throws Exception { + return null; + } + + @Override + public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, + long checkpointID, + long timestamp) throws Exception { + return null; + } + } + + static final class SuccessException extends Exception { + private static final long serialVersionUID = -9218191172606739598L; + } + +}