[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/1239 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-148775942 Manually merged --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r42259842 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for key/value state implementations that are backed by a regular heap hash map. The + * concrete implementations define how the state is checkpointed. + * + * @param The type of the key. + * @param The type of the value. + * @param The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractHeapKvStateimplements KvState { + + /** Map containing the actual key/value pairs */ + private final HashMap state; + + /** The serializer for the keys */ + private final TypeSerializer keySerializer; + + /** The serializer for the values */ + private final TypeSerializer valueSerializer; + + /** The value that is returned when no other value has been associated with a key, yet */ + private final V defaultValue; + + /** The current key, which the next value methods will refer to */ + private K currentKey; + + /** +* Creates a new empty key/value state. +* +* @param keySerializer The serializer for the keys. +* @param valueSerializer The serializer for the values. +* @param defaultValue The value that is returned when no other value has been associated with a key, yet. +*/ + protected AbstractHeapKvState(TypeSerializer keySerializer, + TypeSerializer valueSerializer, + V defaultValue) { + this(keySerializer, valueSerializer, defaultValue, new HashMap ()); + } + + /** +* Creates a new key/value state for the given hash map of key/value pairs. +* +* @param keySerializer The serializer for the keys. +* @param valueSerializer The serializer for the values. +* @param defaultValue The value that is returned when no other value has been associated with a key, yet. +* @param state The state map to use in this kev/value state. May contain initial state. +*/ + protected AbstractHeapKvState(TypeSerializer keySerializer, + TypeSerializer valueSerializer, + V defaultValue, + HashMap state) { + this.state = requireNonNull(state); + this.keySerializer = requireNonNull(keySerializer); + this.valueSerializer = requireNonNull(valueSerializer); + this.defaultValue = defaultValue; + } + + // + + @Override + public V value() { + V value = state.get(currentKey); + return value != null ? value : defaultValue; --- End diff -- Ah, just saw the comment. Probably right, I need to do this in an add-on commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r42098258 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for key/value state implementations that are backed by a regular heap hash map. The + * concrete implementations define how the state is checkpointed. + * + * @param The type of the key. + * @param The type of the value. + * @param The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractHeapKvStateimplements KvState { + + /** Map containing the actual key/value pairs */ + private final HashMap state; + + /** The serializer for the keys */ + private final TypeSerializer keySerializer; + + /** The serializer for the values */ + private final TypeSerializer valueSerializer; + + /** The value that is returned when no other value has been associated with a key, yet */ + private final V defaultValue; + + /** The current key, which the next value methods will refer to */ + private K currentKey; + + /** +* Creates a new empty key/value state. +* +* @param keySerializer The serializer for the keys. +* @param valueSerializer The serializer for the values. +* @param defaultValue The value that is returned when no other value has been associated with a key, yet. +*/ + protected AbstractHeapKvState(TypeSerializer keySerializer, + TypeSerializer valueSerializer, + V defaultValue) { + this(keySerializer, valueSerializer, defaultValue, new HashMap ()); + } + + /** +* Creates a new key/value state for the given hash map of key/value pairs. +* +* @param keySerializer The serializer for the keys. +* @param valueSerializer The serializer for the values. +* @param defaultValue The value that is returned when no other value has been associated with a key, yet. +* @param state The state map to use in this kev/value state. May contain initial state. +*/ + protected AbstractHeapKvState(TypeSerializer keySerializer, + TypeSerializer valueSerializer, + V defaultValue, + HashMap state) { + this.state = requireNonNull(state); + this.keySerializer = requireNonNull(keySerializer); + this.valueSerializer = requireNonNull(valueSerializer); + this.defaultValue = defaultValue; + } + + // + + @Override + public V value() { + V value = state.get(currentKey); + return value != null ? value : defaultValue; --- End diff -- I think you should make a copy of the default value here. Otherwise you end up with the same objects for non primitive types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146830200 I think when drawing the snapshot with `Checkpointed` you are already inside a streaming `Function` or internal operator where one can easily access that information anyway. I think that is fine that way, I agree with @gyfora that access from the `StateBackend` is more important as that is potentially decoupled from drawing the snapshot itself. As for other points listed by @StephanEwen they sound reasonable for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r41493576 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -322,73 +311,84 @@ public String getName() { return getEnvironment().getTaskNameWithSubtasks(); } + /** +* Gets the lock object on which all operations that involve data and state mutation have to lock. + +* @return The checkpoint lock object. +*/ public Object getCheckpointLock() { return lock; } + + public StreamConfig getConfiguration() { + return configuration; + } + + public MapgetAccumulatorMap() { + return accumulatorMap; + } + + public Output getHeadOutput() { + return operatorChain.getChainEntryPoint(); + } + + public RecordWriterOutput[] getStreamOutputs() { + return operatorChain.getStreamOutputs(); + } // // Checkpoint and Restore // - - @SuppressWarnings("unchecked") + @Override - public void setInitialState(StateHandle stateHandle) throws Exception { - - // We retrieve end restore the states for the chained operators. - List >> chainedStates = - (List >>) stateHandle.getState(this.userClassLoader); - - // We restore all stateful operators - for (int i = 0; i < chainedStates.size(); i++) { - Tuple2 > state = chainedStates.get(i); - // If state is not null we need to restore it - if (state != null) { - StreamOperator chainedOperator = outputHandler.getChainedOperators().get(i); - ((StatefulStreamOperator) chainedOperator).restoreInitialState(state); + public void setInitialState(StreamTaskStateList initialState) throws Exception { + LOG.info("Restoring checkpointed state to task {}", getName()); + + final StreamOperator[] allOperators = operatorChain.getAllOperators(); + final StreamTaskState[] states = initialState.getState(userClassLoader); + + for (int i = 0; i < states.length; i++) { + StreamTaskState state = states[i]; + StreamOperator operator = allOperators[i]; + + if (state != null && operator != null) { + LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName()); + operator.restoreState(state); + } + else if (operator != null) { + LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName()); } } } @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); synchronized (lock) { if (isRunning) { - try { - // We wrap the states of the chained operators in a list, marking non-stateful operators with null - List >> chainedStates = new ArrayList<>(); - // A wrapper handle is created for the List of statehandles - WrapperStateHandle stateHandle; - try { - - // We construct a list of states for chained tasks - for (StreamOperator chainedOperator : outputHandler.getChainedOperators()) { - if (chainedOperator instanceof StatefulStreamOperator) { - chainedStates.add(((StatefulStreamOperator) chainedOperator) - .getStateSnapshotFromFunction(checkpointId, timestamp)); -
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146465524 Overall looks very good. :+1: I personally like having the non-partitioned operator states and I use them in my programs, so I would prefer keeping them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146467354 So my argument is that the overhead of implementing the Checkpointed interface for simple counters , offsets and the like is simply too much (and annoying). We can also introduce some annotations that the user can use to tag the state with. Then the only thing we need to make sure is so that these support some custom checkpointing logic. (like the state checkpointer interface) Another thing we should consider is that the Checkpointed interface will never allow any lazy state access logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146472181 on a related note, is anyone planning to complete snapshotting for cyclc graphs for the 0.10 release? Unfortunately I do not have time to work on it this week but it would be great to support it for 0.10 already --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146486097 Other important thing: I think it would be very useful to add some extra context information to the snapshot methods. Something that can be used to identify the operator that took the snapshot. Maybe a combination of task id + task index. I am currently trying to implement a sql based backend, and I would really need to store this information so I can then work with the checkpoint data from the outside. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146486750 I can live without the non-partitioned OperatorStates, but the context info for the snapshots is very hard to work around I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146511645 @ktzoumas that's understandable! We could consider it for the maintenance release maybe if there is enough time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146507802 Also in the StateBackend interface a close method would come in handy, similarly to the initialize to close connections etc. in case of failures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user ktzoumas commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146490272 @senorcarbone I think the cyclic graphs feature will be hard to get into 0.10 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r41509657 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -471,7 +475,11 @@ private StreamGraph generateInternal(Listtransformatio transform.getName()); if (transform.getStateKeySelector() != null) { - streamGraph.setKey(transform.getId(), transform.getStateKeySelector()); + TypeSerializer keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); + streamGraph.setKey(transform.getId(), transform.getStateKeySelector(), keySerializer); + } + if (transform.getStateKeyType() != null) { + --- End diff -- empty if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r41528403 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -322,73 +311,84 @@ public String getName() { return getEnvironment().getTaskNameWithSubtasks(); } + /** +* Gets the lock object on which all operations that involve data and state mutation have to lock. + +* @return The checkpoint lock object. +*/ public Object getCheckpointLock() { return lock; } + + public StreamConfig getConfiguration() { + return configuration; + } + + public MapgetAccumulatorMap() { + return accumulatorMap; + } + + public Output getHeadOutput() { + return operatorChain.getChainEntryPoint(); + } + + public RecordWriterOutput[] getStreamOutputs() { + return operatorChain.getStreamOutputs(); + } // // Checkpoint and Restore // - - @SuppressWarnings("unchecked") + @Override - public void setInitialState(StateHandle stateHandle) throws Exception { - - // We retrieve end restore the states for the chained operators. - List >> chainedStates = - (List >>) stateHandle.getState(this.userClassLoader); - - // We restore all stateful operators - for (int i = 0; i < chainedStates.size(); i++) { - Tuple2 > state = chainedStates.get(i); - // If state is not null we need to restore it - if (state != null) { - StreamOperator chainedOperator = outputHandler.getChainedOperators().get(i); - ((StatefulStreamOperator) chainedOperator).restoreInitialState(state); + public void setInitialState(StreamTaskStateList initialState) throws Exception { + LOG.info("Restoring checkpointed state to task {}", getName()); + + final StreamOperator[] allOperators = operatorChain.getAllOperators(); + final StreamTaskState[] states = initialState.getState(userClassLoader); + + for (int i = 0; i < states.length; i++) { + StreamTaskState state = states[i]; + StreamOperator operator = allOperators[i]; + + if (state != null && operator != null) { + LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName()); + operator.restoreState(state); + } + else if (operator != null) { + LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName()); } } } @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); synchronized (lock) { if (isRunning) { - try { - // We wrap the states of the chained operators in a list, marking non-stateful operators with null - List >> chainedStates = new ArrayList<>(); - // A wrapper handle is created for the List of statehandles - WrapperStateHandle stateHandle; - try { - - // We construct a list of states for chained tasks - for (StreamOperator chainedOperator : outputHandler.getChainedOperators()) { - if (chainedOperator instanceof StatefulStreamOperator) { - chainedStates.add(((StatefulStreamOperator) chainedOperator) - .getStateSnapshotFromFunction(checkpointId, timestamp)); -
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1239#issuecomment-146594656 As for the context, I did not mean to add it to the Checkpointed interface but to the state backend method calls. Other simpler solution for now would be to make the environment or the runtimecontext accessible from the backend, so it knows which task it does the checkpointing for. This is probably the cleanest solution for now until we figure out the exact requirements. Otherwise :+1: from me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1239 [FLINK-2808] Rework state abstraction and clean up task / operator internals This pull request fixes many related/intermixed issues. It was hard to split this into individual issues. ### Crucial bug fixes - State snapshots for memory backed state previously copied a reference into the StateHandle, after which the streaming program continued. If the state was mutated prior to serialization by Akka, the mutated state was checkpointed, rather than the state at the point of drawing the snapshot. - Key/value state is checkpointed as a whole, rather than individually per key. - Memory-backed state now has a maximum size that is checked upon checkpointing. Exceeding that size fails the checkpoint. Before, too large state simply resulted in an oversized Akka frame that was dropped, silently letting the program run without ever completing a checkpoint. ### User-facing changes - The state backend is not only responsible for storing snapshots of the user state, but they also define how exactly the key/value state is represented in the first place. This allows us to plug in external key/value stores to store the Flink key/value state. Default implementations store the state in memory / files. - State backend offers additional methods to checkpoint directly into streams. - One can configure arbitrary default state backends via a factory interface that creates them from the TaskManager configuration. - Key/value state supports arbitrary types without extra checkpointer logic, but user needs to supply type of state (via class or TypeInformation) - Removed the `OperatorState` that is non-partitioned. The only type of state remaining through the ´OperatorState` abstraction is partitioned key/value state in functions that are applied on a KeyedStream. Consequently, the `mapWithState()` and related methods are only available on the `KeyedStream` ### Internal cleanups - Checkpoint barriers are forwarded earlier, to reduce latency introduced by checkpoints. - Fewer in-memory copies when checkpointing to the file system state backend - The StreamingRuntimeContext is used purely for UDF interaction, not to hand over components to the operators. - The infinite reduce and aggregations work properly on key/value state, rather than maintaining their own maps - made the OutputHandler (not OperatorChain) type safe and simpler - made clear distinction between responsibilities of StreamTasks (input/output streams, setup of operator chain, checkpoint coordination) and operators (scope of one function and runtime context) - clean up checkpointing logic between operator (checkpoints generic key/value state) and UDF operators (checkpoint UDFs) - removed Configuration from operator open() method (was used in confusion with UDF open(Configuration()) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink statebackend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1239 commit 3d633f0d608d91cfa69455fa9a47c53bf753a677 Author: Stephan EwenDate: 2015-10-05T13:57:49Z [hotfix] Correct name of HDFS tests from 'org.apache.flink.tachyon' to 'org.apache.flink.hdfstests' commit 441c089552b3045062e8620ad9d2c8411fb387a8 Author: Stephan Ewen Date: 2015-10-05T13:57:04Z [FLINK-2808] [streaming] Refactor and extend state backend abstraction commit 73b65e2196576b0e36730bd0c8d8d3ced56f9f4f Author: Stephan Ewen Date: 2015-10-07T11:54:05Z [FLINK-2808] [streaming] Integrate extended state backend abstraction with streaming state handling --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---