Jenkins build is back to normal : flink-snapshot-deployment #239

2016-09-30 Thread Apache Jenkins Server
See 



buildbot success in on flink-docs-release-0.10

2016-09-30 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/347

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





flink git commit: [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

2016-09-30 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.1 caa0fbb21 -> 90d77594f


[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't 
block on polls

Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' 
may take
very long. This is mostly relevant for low-throughput Kafka topics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d77594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d77594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d77594

Branch: refs/heads/release-1.1
Commit: 90d77594fffda1d8d15658d363c478ea6430514e
Parents: caa0fbb
Author: Stephan Ewen 
Authored: Thu Sep 29 18:09:51 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Sep 30 12:39:53 2016 +0200

--
 .../kafka/internal/Kafka09Fetcher.java  |  73 +++--
 .../connectors/kafka/Kafka09FetcherTest.java| 304 +++
 2 files changed, 355 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 9c861c9..1da2259 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
@@ -74,18 +76,24 @@ public class Kafka09Fetcher extends AbstractFetcher implem
/** The maximum number of milliseconds to wait for a fetch batch */
private final long pollTimeout;
 
-   /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
-   private final Object consumerLock = new Object();
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+   
+   /** The callback invoked by Kafka once an offset commit is complete */
+   private final OffsetCommitCallback offsetCommitCallback;
 
/** Reference to the Kafka consumer, once it is created */
private volatile KafkaConsumer consumer;
-
+   
/** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
private volatile ExceptionProxy errorHandler;
 
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
 
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
// 

 
public Kafka09Fetcher(
@@ -105,6 +113,8 @@ public class Kafka09Fetcher extends AbstractFetcher implem
this.runtimeContext = runtimeContext;
this.kafkaProperties = kafkaProperties;
this.pollTimeout = pollTimeout;
+   this.nextOffsetsToCommit = new AtomicReference<>();
+   this.offsetCommitCallback = new CommitCallback();
 
// if checkpointing is enabled, we are not automatically 
committing to Kafka.

kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -203,19 +213,23 @@ public class Kafka09Fetcher extends AbstractFetcher implem
 
// main fetch loop
while (running) {
+
+   // check if there is something to commit
+   final Map 
toCommit = nextOffsetsToCommit.getAndSet(null);
+   if (toCommit != null && !commitInProg

[09/10] flink git commit: [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

2016-09-30 Thread sewen
[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't 
block on polls

Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' 
may take
very long. This is mostly relevant for low-throughput Kafka topics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92f4539a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92f4539a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92f4539a

Branch: refs/heads/master
Commit: 92f4539afc714f7dbd293c3ad677b3b5807c6911
Parents: 6f8f5eb
Author: Stephan Ewen 
Authored: Thu Sep 29 18:09:51 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Sep 30 12:38:46 2016 +0200

--
 .../kafka/internal/Kafka09Fetcher.java  |  73 +++--
 .../connectors/kafka/Kafka09FetcherTest.java| 304 +++
 2 files changed, 355 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/92f4539a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 9c861c9..1da2259 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
@@ -74,18 +76,24 @@ public class Kafka09Fetcher extends AbstractFetcher implem
/** The maximum number of milliseconds to wait for a fetch batch */
private final long pollTimeout;
 
-   /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
-   private final Object consumerLock = new Object();
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+   
+   /** The callback invoked by Kafka once an offset commit is complete */
+   private final OffsetCommitCallback offsetCommitCallback;
 
/** Reference to the Kafka consumer, once it is created */
private volatile KafkaConsumer consumer;
-
+   
/** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
private volatile ExceptionProxy errorHandler;
 
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
 
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
// 

 
public Kafka09Fetcher(
@@ -105,6 +113,8 @@ public class Kafka09Fetcher extends AbstractFetcher implem
this.runtimeContext = runtimeContext;
this.kafkaProperties = kafkaProperties;
this.pollTimeout = pollTimeout;
+   this.nextOffsetsToCommit = new AtomicReference<>();
+   this.offsetCommitCallback = new CommitCallback();
 
// if checkpointing is enabled, we are not automatically 
committing to Kafka.

kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -203,19 +213,23 @@ public class Kafka09Fetcher extends AbstractFetcher implem
 
// main fetch loop
while (running) {
+
+   // check if there is something to commit
+   final Map 
toCommit = nextOffsetsToCommit.getAndSet(null);
+   if (toCommit != null && !commitInProgress) {
+   // reset the work-to-be committed, so 
we d

[02/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2036f69..f638ddd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -317,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(
long checkpointId,
-   ChainedStateHandle 
chainedStateHandle, List keyGroupStateHandles,
+   CheckpointStateHandles checkpointStateHandles,
long synchronousDurationMillis, long 
asynchronousDurationMillis,
long bytesBufferedInAlignment, long 
alignmentDurationNanos) {
}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 430c6de..247edd6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -24,12 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
@@ -41,11 +43,12 @@ import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the 
operator to get
- * a {@link KeyedStateBackend}.
+ * a {@link AbstractKeyedStateBackend}.
  *
  */
 public class KeyedOneInputStreamOperatorTestHarness
@@ -53,7 +56,7 @@ public class KeyedOneInputStreamOperatorTestHarness
 
// in case the operator creates one we store it here so that we
// can snapshot its state
-   private KeyedStateBackend keyedStateBackend = null;
+   private AbstractKeyedStateBackend keyedStateBackend = null;
 
// when we restore we keep the state here so that we can call restore
// when the operator requests the keyed state backend
@@ -114,7 +117,7 @@ public class KeyedOneInputStreamOperatorTestHarness
final KeyGroupRange keyGroupRange = 
(KeyGroupRange) invocationOnMock.getArguments()[2];
 
if(keyedStateBackend != null) {
-   keyedStateBackend.close();
+   keyedStateBackend.dispose();
}
 
if (restoredKeyedState == null) {
@@ -148,7 +151,7 @@ public class KeyedOneInputStreamOperatorTe

[04/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 73e2808..2f21574 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -80,11 +80,11 @@ public abstract class StateBackendTestBase {
return getStateBackend().createStreamFactory(new JobID(), 
"test_op");
}
 
-   protected  KeyedStateBackend createKeyedBackend(TypeSerializer 
keySerializer) throws Exception {
+   protected  AbstractKeyedStateBackend 
createKeyedBackend(TypeSerializer keySerializer) throws Exception {
return createKeyedBackend(keySerializer, new 
DummyEnvironment("test", 1, 0));
}
 
-   protected  KeyedStateBackend createKeyedBackend(TypeSerializer 
keySerializer, Environment env) throws Exception {
+   protected  AbstractKeyedStateBackend 
createKeyedBackend(TypeSerializer keySerializer, Environment env) throws 
Exception {
return createKeyedBackend(
keySerializer,
10,
@@ -92,7 +92,7 @@ public abstract class StateBackendTestBase {
env);
}
 
-   protected  KeyedStateBackend createKeyedBackend(
+   protected  AbstractKeyedStateBackend createKeyedBackend(
TypeSerializer keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
@@ -104,14 +104,15 @@ public abstract class StateBackendTestBase {
keySerializer,
numberOfKeyGroups,
keyGroupRange,
-   env.getTaskKvStateRegistry());
+   env.getTaskKvStateRegistry())
+;
}
 
-   protected  KeyedStateBackend 
restoreKeyedBackend(TypeSerializer keySerializer, KeyGroupsStateHandle 
state) throws Exception {
+   protected  AbstractKeyedStateBackend 
restoreKeyedBackend(TypeSerializer keySerializer, KeyGroupsStateHandle 
state) throws Exception {
return restoreKeyedBackend(keySerializer, state, new 
DummyEnvironment("test", 1, 0));
}
 
-   protected  KeyedStateBackend restoreKeyedBackend(
+   protected  AbstractKeyedStateBackend restoreKeyedBackend(
TypeSerializer keySerializer,
KeyGroupsStateHandle state,
Environment env) throws Exception {
@@ -123,7 +124,7 @@ public abstract class StateBackendTestBase {
env);
}
 
-   protected  KeyedStateBackend restoreKeyedBackend(
+   protected  AbstractKeyedStateBackend restoreKeyedBackend(
TypeSerializer keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
@@ -144,7 +145,7 @@ public abstract class StateBackendTestBase {
@SuppressWarnings("unchecked")
public void testValueState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
-   KeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+   AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
ValueStateDescriptor kvId = new 
ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
@@ -195,7 +196,7 @@ public abstract class StateBackendTestBase {
assertEquals("u3", state.value());
assertEquals("u3", getSerializedValue(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-   backend.close();
+   backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
 
snapshot1.discardState();
@@ -211,7 +212,7 @@ public abstract class StateBackendTestBase {
assertEquals("2", restored1.value());
assertEquals("2", getSerializedValue(restoredKvState1, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-   backend.close();
+   backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
 
snapshot2.discardState();
@@ -230,7 +231,7 @@ public abstract class StateBackendTestBase {
assertEquals("u3", restored2.value());
assertEquals("u3", getSer

[05/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9adaa86..c39e436 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import com.google.common.collect.Iterables;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -34,21 +36,21 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
@@ -56,6 +58,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -1459,7 +1463,7 @@ public class CheckpointCoordinatorTest {
maxConcurrentAttempts,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
-   new ExecutionVertex[] { commitVertex }, 
+   new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new 
StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1531,7 +1535,7 @@ public class CheckpointCoordinatorTest {
maxConcurrentAttempts, // max two 
concurrent checkpoints
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
-   new ExecutionVertex[] { commitVertex }, 
+   new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new 
StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1791,29 +1795,29 @@ public class CheckpointCoordinatorTest {
 
for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
ChainedStateHandle 
nonPartitionedState = generateStateForVertex(jobVertexID1, index);
+   ChainedStateHandle 
partitionableState = generateChainedPartitionableStateHandle(jobVertexID1, 
index, 2, 8);
List partitionedKeyGroupState = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index));
 
+   CheckpointStateHandles checkpointStateHandles = new 
CheckpointStateHandles(nonPartitionedState, partitionableState, 
partitionedKeyGroupState);
AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-   jid,
-   
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-   checkpointId,
-   

[07/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index f5e3618..7e4eded 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import com.google.common.collect.Iterables;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
@@ -47,27 +49,36 @@ public class TaskState implements StateObject {
/** handles to non-partitioned states, subtaskindex -> subtaskstate */
private final Map subtaskStates;
 
-   /** handles to partitioned states, subtaskindex -> keyed state */
+   /** handles to partitionable states, subtaskindex -> partitionable 
state */
+   private final Map> 
partitionableStates;
+
+   /** handles to key-partitioned states, subtaskindex -> keyed state */
private final Map keyGroupsStateHandles;
 
+
/** parallelism of the operator when it was checkpointed */
private final int parallelism;
 
/** maximum parallelism of the operator when the job was first created 
*/
private final int maxParallelism;
 
-   public TaskState(JobVertexID jobVertexID, int parallelism, int 
maxParallelism) {
+   private final int chainLength;
+
+   public TaskState(JobVertexID jobVertexID, int parallelism, int 
maxParallelism, int chainLength) {
Preconditions.checkArgument(
parallelism <= maxParallelism,
"Parallelism " + parallelism + " is not smaller 
or equal to max parallelism " + maxParallelism + ".");
+   Preconditions.checkArgument(chainLength > 0, "There has to be 
at least one operator in the operator chain.");
 
this.jobVertexID = jobVertexID;
 
this.subtaskStates = new HashMap<>(parallelism);
+   this.partitionableStates = new HashMap<>(parallelism);
this.keyGroupsStateHandles = new HashMap<>(parallelism);
 
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
+   this.chainLength = chainLength;
}
 
public JobVertexID getJobVertexID() {
@@ -85,6 +96,20 @@ public class TaskState implements StateObject {
}
}
 
+   public void putPartitionableState(
+   int subtaskIndex,
+   ChainedStateHandle 
partitionableState) {
+
+   Preconditions.checkNotNull(partitionableState);
+
+   if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+   throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
+   " exceeds the maximum number of sub 
tasks " + subtaskStates.size());
+   } else {
+   partitionableStates.put(subtaskIndex, 
partitionableState);
+   }
+   }
+
public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle 
keyGroupsStateHandle) {
Preconditions.checkNotNull(keyGroupsStateHandle);
 
@@ -106,6 +131,15 @@ public class TaskState implements StateObject {
}
}
 
+   public ChainedStateHandle 
getPartitionableState(int subtaskIndex) {
+   if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+   throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
+   " exceeds the maximum number of sub 
tasks " + subtaskStates.size());
+   } else {
+   return partitionableStates.get(subtaskIndex);
+   }
+   }
+
public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
@@ -131,6 +165,10 @@ public class TaskState implements StateObject {
return maxParallelism;
}
 
+   public int getChainLength() {
+   return chainLength;
+   }
+
public Collection getKeyGroupStates() {
return keyGroupsStateHandles.values();
}
@@ -147,

[08/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
[FLINK-4379] [checkpoints] Introduce rescalable operator state

This introduces the Operator State Backend, which stores state that is not 
partitioned
by a key. It replaces the 'Checkpointed' interface.

Additionally, this introduces CheckpointStateHandles as container for all 
checkpoint related state handles

This closes #2512


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53ed6ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53ed6ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53ed6ada

Branch: refs/heads/master
Commit: 53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c
Parents: 2afc092
Author: Stefan Richter 
Authored: Wed Aug 31 23:59:27 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Sep 30 12:38:46 2016 +0200

--
 .../streaming/state/AbstractRocksDBState.java   |   6 +-
 .../state/RocksDBKeyedStateBackend.java |  75 ++--
 .../streaming/state/RocksDBStateBackend.java|   8 +-
 .../state/RocksDBAsyncSnapshotTest.java |  12 +-
 .../state/RocksDBStateBackendConfigTest.java|  48 ++-
 .../api/common/functions/RuntimeContext.java| 125 +-
 .../util/AbstractRuntimeUDFContext.java |  28 +-
 .../flink/api/common/state/OperatorState.java   |  70 ---
 .../flink/api/common/state/ValueState.java  |   2 +-
 .../java/typeutils/runtime/JavaSerializer.java  | 116 +
 .../flink/hdfstests/FileStateBackendTest.java   |  26 +-
 .../AbstractCEPBasePatternOperator.java |   3 +-
 .../operator/AbstractCEPPatternOperator.java|   2 -
 .../AbstractKeyedCEPPatternOperator.java|   2 -
 .../checkpoint/CheckpointCoordinator.java   | 127 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   5 -
 .../checkpoint/OperatorStateRepartitioner.java  |  42 ++
 .../runtime/checkpoint/PendingCheckpoint.java   |  95 +++--
 .../RoundRobinOperatorStateRepartitioner.java   | 190 +
 .../flink/runtime/checkpoint/SubtaskState.java  |   9 -
 .../flink/runtime/checkpoint/TaskState.java |  79 +++-
 .../savepoint/SavepointV1Serializer.java|  97 -
 .../deployment/TaskDeploymentDescriptor.java|  50 ++-
 .../flink/runtime/execution/Environment.java|  16 +-
 .../flink/runtime/executiongraph/Execution.java |  25 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 -
 .../runtime/executiongraph/ExecutionVertex.java |   6 +-
 .../runtime/jobgraph/tasks/StatefulTask.java|  11 +-
 .../checkpoint/AcknowledgeCheckpoint.java   |  67 ++-
 .../runtime/state/AbstractCloseableHandle.java  | 126 --
 .../state/AbstractKeyedStateBackend.java| 342 +++
 .../runtime/state/AbstractStateBackend.java |  43 +-
 .../flink/runtime/state/ChainedStateHandle.java |   7 +-
 .../runtime/state/CheckpointStateHandles.java   | 103 +
 .../flink/runtime/state/ClosableRegistry.java   |  84 
 .../state/DefaultOperatorStateBackend.java  | 215 ++
 .../runtime/state/KeyGroupRangeOffsets.java |   2 +
 .../runtime/state/KeyGroupsStateHandle.java |   6 -
 .../flink/runtime/state/KeyedStateBackend.java  | 301 ++---
 .../runtime/state/OperatorStateBackend.java |  35 ++
 .../runtime/state/OperatorStateHandle.java  | 109 +
 .../flink/runtime/state/OperatorStateStore.java |  47 +++
 ...artitionableCheckpointStateOutputStream.java |  96 +
 .../state/RetrievableStreamStateHandle.java |   2 +-
 .../flink/runtime/state/SnapshotProvider.java   |  45 ++
 .../apache/flink/runtime/state/StateObject.java |   6 +-
 .../apache/flink/runtime/state/StateUtil.java   |  37 --
 .../state/filesystem/FileStateHandle.java   |   8 +-
 .../state/filesystem/FsStateBackend.java|   6 +-
 .../state/heap/HeapKeyedStateBackend.java   | 210 -
 .../state/memory/ByteStreamStateHandle.java |  13 +-
 .../state/memory/MemoryStateBackend.java|   9 +-
 .../ActorGatewayCheckpointResponder.java|  11 +-
 .../taskmanager/CheckpointResponder.java|  15 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  11 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 421 +++
 .../checkpoint/CheckpointStateRestoreTest.java  |  46 +-
 .../CompletedCheckpointStoreTest.java   |   2 +-
 .../checkpoint/PendingCheckpointTest.java   |   2 +-
 .../checkpoint/PendingSavepointTest.java|   2 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   5 -
 .../checkpoint/savepoint/SavepointV1Test.java   |  20 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java|  20 +-
 .../messages/CheckpointMessagesTest.java|  17 +-
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../operators/testutils/MockEnvironment.java|   3 +-
 .../runtime/query/QueryableStateClientTest.java |   4 +-
 .../runtime

[06/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 5612f73..7293a84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,178 +18,55 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.RunnableFuture;
 
 /**
- * A keyed state backend is responsible for managing keyed state. The state 
can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * A keyed state backend provides methods for managing keyed state.
  *
  * @param  The key by which state is keyed.
  */
-public abstract class KeyedStateBackend {
-
-   /** {@link TypeSerializer} for our key. */
-   protected final TypeSerializer keySerializer;
-
-   /** The currently active key. */
-   protected K currentKey;
-
-   /** The key group of the currently active key */
-   private int currentKeyGroup;
-
-   /** So that we can give out state when the user uses the same key. */
-   protected HashMap> keyValueStatesByName;
-
-   /** For caching the last accessed partitioned state */
-   private String lastName;
-
-   @SuppressWarnings("rawtypes")
-   private KvState lastState;
-
-   /** The number of key-groups aka max parallelism */
-   protected final int numberOfKeyGroups;
-
-   /** Range of key-groups for which this backend is responsible */
-   protected final KeyGroupRange keyGroupRange;
-
-   /** KvStateRegistry helper for this task */
-   protected final TaskKvStateRegistry kvStateRegistry;
-
-   protected final ClassLoader userCodeClassLoader;
-
-   public KeyedStateBackend(
-   TaskKvStateRegistry kvStateRegistry,
-   TypeSerializer keySerializer,
-   ClassLoader userCodeClassLoader,
-   int numberOfKeyGroups,
-   KeyGroupRange keyGroupRange) {
-
-   this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
-   this.keySerializer = Preconditions.checkNotNull(keySerializer);
-   this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
-   this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
-   this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-   }
+public interface KeyedStateBackend {
 
/**
-* Closes the state backend, releasing all internal resources, but does 
not delete any persistent
-* checkpoint data.
-*
-* @throws Exception Exceptions can be forwarded and will be logged by 
the system
+* Sets the current key that is used for partitioned state.
+* @param newKey The new current key.
 */
-   public void close() throws Exception {
-   if (kvStateRegistry != null) {
-   kvStateRegistry.unregisterAll();
-   }
-
-   lastName = null;
-   lastState = null;
-   keyValueStatesByName = null;
-   }
+ 

[01/10] flink git commit: [FLINK-4573] [web dashboard] Fix potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-30 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 477d1c5d4 -> 92f4539af


[FLINK-4573] [web dashboard] Fix potential resource leak due to unclosed 
RandomAccessFile in TaskManagerLogHandler

This closes #2556


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2afc0924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2afc0924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2afc0924

Branch: refs/heads/master
Commit: 2afc092461cf68cf0f3c26a3ab4c58a7bd68cf71
Parents: 477d1c5
Author: Liwei Lin 
Authored: Tue Sep 27 20:49:52 2016 +0800
Committer: Stephan Ewen 
Committed: Fri Sep 30 11:32:39 2016 +0200

--
 .../webmonitor/handlers/TaskManagerLogHandler.java| 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2afc0924/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 5343049..2f0d438 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -210,7 +210,15 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
-   long fileLength = raf.length();
+   long fileLength;
+   try {
+   fileLength = 
raf.length();
+   } catch (IOException ioe) {
+   display(ctx, request, 
"Displaying TaskManager log failed.");
+   LOG.error("Displaying 
TaskManager log failed.", ioe);
+   raf.close();
+   throw ioe;
+   }
final FileChannel fc = 
raf.getChannel();
 
HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);



[10/10] flink git commit: [FLINK-4379] [checkpoints] Fix minor bug and improve debug logging

2016-09-30 Thread sewen
[FLINK-4379] [checkpoints] Fix minor bug and improve debug logging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8f5eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8f5eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8f5eb3

Branch: refs/heads/master
Commit: 6f8f5eb3b9ba07cd3bb4d9f7edd43d4b8862acbe
Parents: 53ed6ad
Author: Stephan Ewen 
Authored: Thu Sep 29 21:12:38 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Sep 30 12:38:46 2016 +0200

--
 .../streaming/runtime/tasks/StreamTask.java  | 19 +--
 1 file changed, 13 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6f8f5eb3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1725eca..88c3ba4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -717,6 +717,13 @@ public abstract class StreamTask>
 

cancelables.registerClosable(asyncCheckpointRunnable);

asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("{} - finished synchronous 
part of checkpoint {}." +
+   "Alignment duration: {} 
ms, snapshot duration {} ms",
+   getName(), 
checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+   }
+
return true;
} else {
return false;
@@ -998,12 +1005,12 @@ public abstract class StreamTask>
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
 
-   if (nonPartitionedStateHandles.isEmpty() && 
keyedStates.isEmpty()) {
-   
owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+   if (nonPartitionedStateHandles.isEmpty() && 
partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
+   
owner.getEnvironment().acknowledgeCheckpoint(
+   checkpointId,
syncDurationMillies, 
asyncDurationMillis,

bytesBufferedInAlignment, alignmentDurationNanos);
-   } else  {
-
+   } else {
CheckpointStateHandles allStateHandles 
= new CheckpointStateHandles(

nonPartitionedStateHandles,

partitioneableStateHandles,
@@ -1016,8 +1023,8 @@ public abstract class StreamTask>
}
 
if (LOG.isDebugEnabled()) {
-   LOG.debug("Finished asynchronous 
checkpoints for checkpoint {} on task {}. Returning handles on " +
-   "keyed states {}.", 
checkpointId, name, keyedStates);
+   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms", 
+   owner.getName(), 
checkpointId, asyncDurationMillis);
}
}
catch (Exception e) {



[03/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

2016-09-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a73f3b2..0ca89ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,29 +18,35 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.concurrent.RunnableFuture;
+
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class 
  * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass 
of this class). 
@@ -90,7 +96,12 @@ public abstract class AbstractStreamOperator
private transient KeySelector stateKeySelector2;
 
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
-   private transient KeyedStateBackend keyedStateBackend;
+   private transient AbstractKeyedStateBackend keyedStateBackend;
+
+   /** Operator state backend */
+   private transient OperatorStateBackend operatorStateBackend;
+
+   private transient Collection 
lazyRestoreStateHandles;
 
protected transient MetricGroup metrics;
 
@@ -116,9 +127,14 @@ public abstract class AbstractStreamOperator
return metrics;
}
 
+   @Override
+   public void restoreState(Collection stateHandles) {
+   this.lazyRestoreStateHandles = stateHandles;
+   }
+
/**
 * This method is called immediately before any elements are processed, 
it should contain the
-* operator's initialization logic.
+* operator's initialization logic, e.g. state initialization.
 *
 * The default implementation does nothing.
 * 
@@ -126,24 +142,39 @@ public abstract class AbstractStreamOperator
 */
@Override
public void open() throws Exception {
+   initOperatorState();
+   initKeyedState();
+   }
+
+   private void initKeyedState() {
try {
TypeSerializer keySerializer = 
config.getStateKeySerializer(getUserCodeClassloader());
// create a keyed state backend if there is keyed 
state, as indicated by the presence of a key serializer
if (null != keySerializer) {
-   ExecutionConfig execConf = 
container.getEnvironment().getExecutionConfig();;
 
KeyGroupRange subTaskKeyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(

container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),

container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),

container.getIndexInSubtaskGroup());
 
-   keyedStateBackend = 
container.createKeye

[1/2] flink git commit: [FLINK-4711] Let the Task trigger partition state requests and handle their responses

2016-09-30 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 7758571ae -> 477d1c5d4


http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 0c79c4e..47a4090 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -46,7 +47,6 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
@@ -76,6 +76,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -188,15 +189,14 @@ public class StreamTaskTest {

ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+   PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+   Executor executor = mock(Executor.class);
NetworkEnvironment network = mock(NetworkEnvironment.class);

when(network.getResultPartitionManager()).thenReturn(partitionManager);

when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
 
-   JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
-   
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
-
TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
new SerializedValue<>(new ExecutionConfig()),
@@ -215,7 +215,6 @@ public class StreamTaskTest {
mock(MemoryManager.class),
mock(IOManager.class),
network,
-   jobManagerCommunicationFactory,
mock(BroadcastVariableManager.class),
mock(TaskManagerConnection.class),
mock(InputSplitProvider.class),
@@ -223,7 +222,10 @@ public class StreamTaskTest {
libCache,
mock(FileCache.class),
new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-   mock(TaskMetricGroup.class));
+   mock(TaskMetricGroup.class),
+   consumableNotifier,
+   partitionStateChecker,
+   executor);
}

// 




[2/2] flink git commit: [FLINK-4711] Let the Task trigger partition state requests and handle their responses

2016-09-30 Thread trohrmann
[FLINK-4711] Let the Task trigger partition state requests and handle their 
responses

This PR makes changes the partition state check in a way that the Task is now 
responsible
for triggering the state check instead of the SingleInputGate. Furthermore, the 
operation
returns a future containing the JobManager's answer. That way we don't have to 
route the
response through the TaskManager and can add automatic retries in case of a 
timeout.

The PR removes the JobManagerCommunicationFactory and gets rid of the excessive
PartitionStateChecker and ResultPartitionConsumableNotifier creation. Instead 
of creating
for each SingleInputGate one PartitionStateChecker we create one for the 
TaskManager which
is reused across all SingleInputGates. The same applies to the
ResultPartitionConsumableNotifier.

This closes #2569.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/477d1c5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/477d1c5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/477d1c5d

Branch: refs/heads/master
Commit: 477d1c5d4ca6f469b3c87bc1f7962ece805cae1d
Parents: 7758571
Author: Till Rohrmann 
Authored: Thu Sep 29 16:19:30 2016 +0200
Committer: Till Rohrmann 
Committed: Fri Sep 30 09:07:29 2016 +0200

--
 .../runtime/io/network/PartitionState.java  | 54 +++
 .../io/network/netty/PartitionStateChecker.java |  5 +-
 .../io/network/partition/ResultPartition.java   | 31 ---
 .../ResultPartitionConsumableNotifier.java  |  5 +-
 .../io/network/partition/ResultPartitionID.java |  2 +
 .../partition/consumer/SingleInputGate.java | 16 ++--
 ...orGatewayJobManagerCommunicationFactory.java | 61 -
 .../ActorGatewayPartitionStateChecker.java  | 34 ---
 ...atewayResultPartitionConsumableNotifier.java |  8 +-
 .../JobManagerCommunicationFactory.java | 47 --
 .../apache/flink/runtime/taskmanager/Task.java  | 95 +++-
 .../flink/runtime/taskmanager/TaskActions.java  | 51 +++
 .../flink/runtime/jobmanager/JobManager.scala   |  6 +-
 .../runtime/messages/TaskControlMessages.scala  | 14 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 75 +---
 .../io/network/NetworkEnvironmentTest.java  | 46 --
 .../consumer/LocalInputChannelTest.java | 41 +
 .../partition/consumer/SingleInputGateTest.java | 38 
 .../partition/consumer/TestSingleInputGate.java | 11 ++-
 .../partition/consumer/UnionInputGateTest.java  |  6 +-
 .../runtime/jobmanager/JobManagerTest.java  | 54 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java  | 13 +--
 .../runtime/taskmanager/TaskManagerTest.java|  3 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |  9 +-
 .../flink/runtime/taskmanager/TaskTest.java | 26 +++---
 .../tasks/InterruptSensitiveRestoreTest.java| 10 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 14 +--
 27 files changed, 420 insertions(+), 355 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
new file mode 100644
index 000..083412b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Contains information about the state of a result partition.
+ */
+public class PartitionState {
+   private final IntermediateDataSe