[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-16 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/1239


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-148775942
  
Manually merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1239#discussion_r42259842
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a 
regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ * 
+ * @param  The type of the key.
+ * @param  The type of the value.
+ * @param  The type of the backend that snapshots this key/value 
state.
+ */
+public abstract class AbstractHeapKvState implements KvState {
+
+   /** Map containing the actual key/value pairs */
+   private final HashMap state;
+   
+   /** The serializer for the keys */
+   private final TypeSerializer keySerializer;
+
+   /** The serializer for the values */
+   private final TypeSerializer valueSerializer;
+   
+   /** The value that is returned when no other value has been associated 
with a key, yet */
+   private final V defaultValue;
+   
+   /** The current key, which the next value methods will refer to */
+   private K currentKey;
+   
+   /**
+* Creates a new empty key/value state.
+* 
+* @param keySerializer The serializer for the keys.
+* @param valueSerializer The serializer for the values.
+* @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+*/
+   protected AbstractHeapKvState(TypeSerializer keySerializer,
+   
TypeSerializer valueSerializer,
+   V 
defaultValue) {
+   this(keySerializer, valueSerializer, defaultValue, new 
HashMap());
+   }
+
+   /**
+* Creates a new key/value state for the given hash map of key/value 
pairs.
+* 
+* @param keySerializer The serializer for the keys.
+* @param valueSerializer The serializer for the values.
+* @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+* @param state The state map to use in this kev/value state. May 
contain initial state.   
+*/
+   protected AbstractHeapKvState(TypeSerializer keySerializer,
+   
TypeSerializer valueSerializer,
+   V 
defaultValue,
+   
HashMap state) {
+   this.state = requireNonNull(state);
+   this.keySerializer = requireNonNull(keySerializer);
+   this.valueSerializer = requireNonNull(valueSerializer);
+   this.defaultValue = defaultValue;
+   }
+
+   // 

+   
+   @Override
+   public V value() {
+   V value = state.get(currentKey);
+   return value != null ? value : defaultValue;
--- End diff --

Ah, just saw the comment. Probably right, I need to do this in an add-on 
commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-15 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1239#discussion_r42098258
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a 
regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ * 
+ * @param  The type of the key.
+ * @param  The type of the value.
+ * @param  The type of the backend that snapshots this key/value 
state.
+ */
+public abstract class AbstractHeapKvState implements KvState {
+
+   /** Map containing the actual key/value pairs */
+   private final HashMap state;
+   
+   /** The serializer for the keys */
+   private final TypeSerializer keySerializer;
+
+   /** The serializer for the values */
+   private final TypeSerializer valueSerializer;
+   
+   /** The value that is returned when no other value has been associated 
with a key, yet */
+   private final V defaultValue;
+   
+   /** The current key, which the next value methods will refer to */
+   private K currentKey;
+   
+   /**
+* Creates a new empty key/value state.
+* 
+* @param keySerializer The serializer for the keys.
+* @param valueSerializer The serializer for the values.
+* @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+*/
+   protected AbstractHeapKvState(TypeSerializer keySerializer,
+   
TypeSerializer valueSerializer,
+   V 
defaultValue) {
+   this(keySerializer, valueSerializer, defaultValue, new 
HashMap());
+   }
+
+   /**
+* Creates a new key/value state for the given hash map of key/value 
pairs.
+* 
+* @param keySerializer The serializer for the keys.
+* @param valueSerializer The serializer for the values.
+* @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+* @param state The state map to use in this kev/value state. May 
contain initial state.   
+*/
+   protected AbstractHeapKvState(TypeSerializer keySerializer,
+   
TypeSerializer valueSerializer,
+   V 
defaultValue,
+   
HashMap state) {
+   this.state = requireNonNull(state);
+   this.keySerializer = requireNonNull(keySerializer);
+   this.valueSerializer = requireNonNull(valueSerializer);
+   this.defaultValue = defaultValue;
+   }
+
+   // 

+   
+   @Override
+   public V value() {
+   V value = state.get(currentKey);
+   return value != null ? value : defaultValue;
--- End diff --

I think you should make a copy of the default value here. Otherwise you end 
up with the same objects for non primitive types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-09 Thread mbalassi
Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146830200
  
I think when drawing the snapshot with `Checkpointed` you are already 
inside a streaming `Function` or internal operator where one can easily access 
that information anyway. 
I think that is fine that way, I agree with @gyfora that access from the 
`StateBackend` is more important as that is potentially decoupled from drawing 
the snapshot itself.
As for other points listed by @StephanEwen they sound reasonable for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread senorcarbone
Github user senorcarbone commented on a diff in the pull request:

https://github.com/apache/flink/pull/1239#discussion_r41493576
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -322,73 +311,84 @@ public String getName() {
return getEnvironment().getTaskNameWithSubtasks();
}
 
+   /**
+* Gets the lock object on which all operations that involve data and 
state mutation have to lock. 
+
+* @return The checkpoint lock object.
+*/
public Object getCheckpointLock() {
return lock;
}
+   
+   public StreamConfig getConfiguration() {
+   return configuration;
+   }
+
+   public Map getAccumulatorMap() {
+   return accumulatorMap;
+   }
+   
+   public Output getHeadOutput() {
+   return operatorChain.getChainEntryPoint();
+   }
+   
+   public RecordWriterOutput[] getStreamOutputs() {
+   return operatorChain.getStreamOutputs();
+   }
 
// 

//  Checkpoint and Restore
// 

-
-   @SuppressWarnings("unchecked")
+   
@Override
-   public void setInitialState(StateHandle stateHandle) 
throws Exception {
-
-   // We retrieve end restore the states for the chained operators.
-   List>> chainedStates = 
-   (List>>) stateHandle.getState(this.userClassLoader);
-
-   // We restore all stateful operators
-   for (int i = 0; i < chainedStates.size(); i++) {
-   Tuple2> state = chainedStates.get(i);
-   // If state is not null we need to restore it
-   if (state != null) {
-   StreamOperator chainedOperator = 
outputHandler.getChainedOperators().get(i);
-   ((StatefulStreamOperator) 
chainedOperator).restoreInitialState(state);
+   public void setInitialState(StreamTaskStateList initialState) throws 
Exception {
+   LOG.info("Restoring checkpointed state to task {}", getName());
+   
+   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
+   final StreamTaskState[] states = 
initialState.getState(userClassLoader);
+   
+   for (int i = 0; i < states.length; i++) {
+   StreamTaskState state = states[i];
+   StreamOperator operator = allOperators[i];
+   
+   if (state != null && operator != null) {
+   LOG.debug("Task {} in chain ({}) has 
checkpointed state", i, getName());
+   operator.restoreState(state);
+   }
+   else if (operator != null) {
+   LOG.debug("Task {} in chain ({}) does not have 
checkpointed state", i, getName());
}
}
}
 
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception {
-
LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());

synchronized (lock) {
if (isRunning) {
-   try {
-   // We wrap the states of the chained 
operators in a list, marking non-stateful operators with null
-   List>> chainedStates = new ArrayList<>();
 
-   // A wrapper handle is created for the 
List of statehandles
-   WrapperStateHandle stateHandle;
-   try {
-
-   // We construct a list of 
states for chained tasks
-   for (StreamOperator 
chainedOperator : outputHandler.getChainedOperators()) {
-   if (chainedOperator 
instanceof StatefulStreamOperator) {
-   
chainedStates.add(((StatefulStreamOperator) chainedOperator)
-   
.getStateSnapshotFromFunction(checkpointId, timestamp));
-  

[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146465524
  
Overall looks very good. :+1: 

I personally like having the non-partitioned operator states and I use them 
in my programs, so I would prefer keeping them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146467354
  
So my argument is that the overhead of implementing the Checkpointed 
interface for simple counters , offsets and the like is simply too much (and 
annoying).

We can also introduce some annotations that the user can use to tag the 
state with. Then the only thing we need to make sure is so that these support 
some custom checkpointing logic. (like the state checkpointer interface)

Another thing we should consider is that the Checkpointed interface will 
never allow any lazy state access logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146472181
  
on a related note, is anyone planning to complete snapshotting for cyclc 
graphs for the 0.10 release? Unfortunately I do not have time to work on it 
this week but it would be great to support it for 0.10 already


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146486097
  
Other important thing:

I think it would be very useful to add some extra context information to 
the snapshot methods. Something that can be used to identify the operator that 
took the snapshot. Maybe a combination of task id + task index.

I am currently trying to implement a sql based backend, and I would really 
need to store this information so I can then work with the checkpoint data from 
the outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146486750
  
I can live without the non-partitioned OperatorStates, but the context info 
for the snapshots is very hard to work around I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146511645
  
@ktzoumas that's understandable! We could consider it for the maintenance 
release maybe if there is enough time


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146507802
  
Also in the StateBackend interface a close method would come in handy, 
similarly to the initialize to close connections etc. in case of failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread ktzoumas
Github user ktzoumas commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146490272
  
@senorcarbone I think the cyclic graphs feature will be hard to get into 
0.10


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1239#discussion_r41509657
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -471,7 +475,11 @@ private StreamGraph 
generateInternal(List transformatio
transform.getName());
 
if (transform.getStateKeySelector() != null) {
-   streamGraph.setKey(transform.getId(), 
transform.getStateKeySelector());
+   TypeSerializer keySerializer = 
transform.getStateKeyType().createSerializer(env.getConfig());
+   streamGraph.setKey(transform.getId(), 
transform.getStateKeySelector(), keySerializer);
+   }
+   if (transform.getStateKeyType() != null) {
+   
--- End diff --

empty if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1239#discussion_r41528403
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -322,73 +311,84 @@ public String getName() {
return getEnvironment().getTaskNameWithSubtasks();
}
 
+   /**
+* Gets the lock object on which all operations that involve data and 
state mutation have to lock. 
+
+* @return The checkpoint lock object.
+*/
public Object getCheckpointLock() {
return lock;
}
+   
+   public StreamConfig getConfiguration() {
+   return configuration;
+   }
+
+   public Map getAccumulatorMap() {
+   return accumulatorMap;
+   }
+   
+   public Output getHeadOutput() {
+   return operatorChain.getChainEntryPoint();
+   }
+   
+   public RecordWriterOutput[] getStreamOutputs() {
+   return operatorChain.getStreamOutputs();
+   }
 
// 

//  Checkpoint and Restore
// 

-
-   @SuppressWarnings("unchecked")
+   
@Override
-   public void setInitialState(StateHandle stateHandle) 
throws Exception {
-
-   // We retrieve end restore the states for the chained operators.
-   List>> chainedStates = 
-   (List>>) stateHandle.getState(this.userClassLoader);
-
-   // We restore all stateful operators
-   for (int i = 0; i < chainedStates.size(); i++) {
-   Tuple2> state = chainedStates.get(i);
-   // If state is not null we need to restore it
-   if (state != null) {
-   StreamOperator chainedOperator = 
outputHandler.getChainedOperators().get(i);
-   ((StatefulStreamOperator) 
chainedOperator).restoreInitialState(state);
+   public void setInitialState(StreamTaskStateList initialState) throws 
Exception {
+   LOG.info("Restoring checkpointed state to task {}", getName());
+   
+   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
+   final StreamTaskState[] states = 
initialState.getState(userClassLoader);
+   
+   for (int i = 0; i < states.length; i++) {
+   StreamTaskState state = states[i];
+   StreamOperator operator = allOperators[i];
+   
+   if (state != null && operator != null) {
+   LOG.debug("Task {} in chain ({}) has 
checkpointed state", i, getName());
+   operator.restoreState(state);
+   }
+   else if (operator != null) {
+   LOG.debug("Task {} in chain ({}) does not have 
checkpointed state", i, getName());
}
}
}
 
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception {
-
LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());

synchronized (lock) {
if (isRunning) {
-   try {
-   // We wrap the states of the chained 
operators in a list, marking non-stateful operators with null
-   List>> chainedStates = new ArrayList<>();
 
-   // A wrapper handle is created for the 
List of statehandles
-   WrapperStateHandle stateHandle;
-   try {
-
-   // We construct a list of 
states for chained tasks
-   for (StreamOperator 
chainedOperator : outputHandler.getChainedOperators()) {
-   if (chainedOperator 
instanceof StatefulStreamOperator) {
-   
chainedStates.add(((StatefulStreamOperator) chainedOperator)
-   
.getStateSnapshotFromFunction(checkpointId, timestamp));
-   

[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-08 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1239#issuecomment-146594656
  
As for the context, I did not mean to add it to the Checkpointed interface 
but to the state backend method calls. 

Other simpler solution for now would be to make the environment or the 
runtimecontext accessible from the backend, so it knows which task it does the 
checkpointing for. This is probably the cleanest solution for now until we 
figure out the exact requirements.

Otherwise :+1: from me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2808] Rework state abstraction and clea...

2015-10-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1239

[FLINK-2808] Rework state abstraction and clean up task / operator internals

This pull request fixes many related/intermixed issues. It was hard to 
split this into individual issues.

### Crucial bug fixes

  - State snapshots for memory backed state previously copied a reference 
into the StateHandle, after which the streaming program continued. If the state 
was mutated prior to serialization by Akka, the mutated state was checkpointed, 
rather than the state at the point of drawing the snapshot.
  
  - Key/value state is checkpointed as a whole, rather than individually 
per key.
  
  - Memory-backed state now has a maximum size that is checked upon 
checkpointing. Exceeding that size fails the checkpoint. Before, too large 
state simply resulted in an oversized Akka frame that was dropped, silently 
letting the program run without ever completing a checkpoint.


### User-facing changes

  - The state backend is not only responsible for storing snapshots of the 
user state, but they also define how exactly the key/value state is represented 
in the first place. This allows us to plug in external key/value stores to 
store the Flink key/value state. Default implementations store the state in 
memory / files.
  
  - State backend offers additional methods to checkpoint directly into 
streams.
  
  - One can configure arbitrary default state backends via a factory 
interface that creates them from the TaskManager configuration.
  
  - Key/value state supports arbitrary types without extra checkpointer 
logic, but user needs to supply type of state (via class or TypeInformation)
  
  - Removed the `OperatorState` that is non-partitioned. The only type of 
state remaining through the ´OperatorState` abstraction is partitioned 
key/value state in functions that are applied on a KeyedStream. Consequently, 
the `mapWithState()` and related methods are only available on the `KeyedStream`
  

### Internal cleanups

  - Checkpoint barriers are forwarded earlier, to reduce latency introduced 
by checkpoints.

  - Fewer in-memory copies when checkpointing to the file system state 
backend

  - The StreamingRuntimeContext is used purely for UDF interaction, not to 
hand over components to the operators. 

  - The infinite reduce and aggregations work properly on key/value state, 
rather than maintaining their own maps
  
  - made the OutputHandler (not OperatorChain) type safe and simpler
  
  - made clear distinction between responsibilities of StreamTasks 
(input/output streams, setup of operator chain, checkpoint coordination) and 
operators (scope of one function and runtime context)
  
  - clean up checkpointing logic between operator (checkpoints generic 
key/value state) and UDF operators (checkpoint UDFs)
  
  - removed Configuration from operator open() method (was used in 
confusion with UDF open(Configuration())

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink statebackend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1239


commit 3d633f0d608d91cfa69455fa9a47c53bf753a677
Author: Stephan Ewen 
Date:   2015-10-05T13:57:49Z

[hotfix] Correct name of HDFS tests from 'org.apache.flink.tachyon' to 
'org.apache.flink.hdfstests'

commit 441c089552b3045062e8620ad9d2c8411fb387a8
Author: Stephan Ewen 
Date:   2015-10-05T13:57:04Z

[FLINK-2808] [streaming] Refactor and extend state backend abstraction

commit 73b65e2196576b0e36730bd0c8d8d3ced56f9f4f
Author: Stephan Ewen 
Date:   2015-10-07T11:54:05Z

[FLINK-2808] [streaming] Integrate extended state backend abstraction with 
streaming state handling




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---