flink git commit: [FLINK-3225] Enforce translation to DataSetNodes

2016-02-01 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 7ecb70105 -> 297564646


[FLINK-3225] Enforce translation to DataSetNodes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29756464
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29756464
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29756464

Branch: refs/heads/tableOnCalcite
Commit: 2975646469bcc6b29b36e5d8bb61663af6da8749
Parents: 7ecb701
Author: Fabian Hueske 
Authored: Mon Feb 1 23:45:16 2016 +0100
Committer: Fabian Hueske 
Committed: Mon Feb 1 23:47:08 2016 +0100

--
 .../flink/api/java/table/JavaBatchTranslator.scala  | 16 +---
 1 file changed, 9 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/29756464/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 66bfbe7..7e91190 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.java.table
 
 import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.plan._
 import org.apache.flink.api.table.Table
-import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
@@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator {
 // get the planner for the plan
 val planner = lPlan.getCluster.getPlanner
 
-// we do not have any special requirements for the output
-val outputProps = RelTraitSet.createEmpty()
 
 println("---")
 println("Input Plan:")
 println("---")
 println(RelOptUtil.toString(lPlan))
-
+
 // decorrelate
 val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
 
 // optimize the logical Flink plan
 val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-val optPlan = optProgram.run(planner, decorPlan, outputProps)
+val flinkOutputProps = RelTraitSet.createEmpty()
+val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps)
 
 println("---")
 println("Optimized Plan:")
@@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator {
 
 // optimize the logical Flink plan
 val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
-val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps)
+val dataSetOutputProps = RelTraitSet.createEmpty()
+  .plus(DataSetConvention.INSTANCE)
+  .plus(RelCollations.of()).simplify()
+val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps)
 
 println("-")
 println("DataSet Plan:")



flink git commit: [FLINK-3300] fix concurrency bug in YarnJobManager

2016-02-01 Thread mxm
Repository: flink
Updated Branches:
  refs/heads/master 086acf681 -> 2a49eaaf3


[FLINK-3300] fix concurrency bug in YarnJobManager

Adds message passing between Hadoop's async resource manager client and
the YarnJobManager actor.

This closes #1561.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a49eaaf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a49eaaf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a49eaaf

Branch: refs/heads/master
Commit: 2a49eaaf3c949864457aee0ffd99343a50ac7285
Parents: 086acf6
Author: Maximilian Michels 
Authored: Fri Jan 29 11:21:08 2016 +0100
Committer: Maximilian Michels 
Committed: Mon Feb 1 10:09:37 2016 +0100

--
 .../org/apache/flink/yarn/YarnJobManager.scala  | 425 +--
 .../org/apache/flink/yarn/YarnMessages.scala|  21 +-
 2 files changed, 231 insertions(+), 215 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
--
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 135f87e..92eb4be 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -180,6 +180,7 @@ class YarnJobManager(
 _ ! decorateMessage(JobManagerStopped)
   }
 
+  // Shutdown and discard all queued messages
   context.system.shutdown()
 
 case RegisterApplicationClient =>
@@ -220,6 +221,85 @@ class YarnJobManager(
 case StartYarnSession(hadoopConfig, webServerPort) =>
   startYarnSession(hadoopConfig, webServerPort)
 
+
+case YarnContainersAllocated(containers: JavaList[Container]) =>
+  val newlyAllocatedContainers = containers.asScala
+
+  newlyAllocatedContainers.foreach {
+container => log.info(s"Got new container for allocation: 
${container.getId}")
+  }
+
+  allocatedContainersList ++= containers.asScala
+  numPendingRequests = math.max(0, numPendingRequests - 
newlyAllocatedContainers.length)
+
+  allocateContainers()
+
+  if (runningContainers >= numTaskManagers && 
allocatedContainersList.nonEmpty) {
+log.info(s"Flink has ${allocatedContainersList.size} allocated 
containers which " +
+  s"are not needed right now. Returning them")
+for (container <- allocatedContainersList) {
+  rmClientOption match {
+case Some(client) => 
client.releaseAssignedContainer(container.getId)
+case None =>
+  }
+}
+allocatedContainersList = List()
+  }
+
+
+case YarnContainersCompleted(statuses: JavaList[ContainerStatus]) =>
+
+  val completedContainerStatuses = statuses.asScala
+  val idStatusMap = completedContainerStatuses
+.map(status => (status.getContainerId, status)).toMap
+
+  completedContainerStatuses.foreach {
+status => log.info(s"Container ${status.getContainerId} is completed " 
+
+  s"with diagnostics: ${status.getDiagnostics}")
+  }
+
+  // get failed containers (returned containers are also completed, so we 
have to
+  // distinguish if it was running before).
+  val (completedContainers, remainingRunningContainers) = 
runningContainersList
+.partition(idStatusMap contains _.getId)
+
+  completedContainers.foreach {
+container =>
+  val status = idStatusMap(container.getId)
+  failedContainers += 1
+  runningContainers -= 1
+  log.info(s"Container ${status.getContainerId} was a running 
container. " +
+s"Total failed containers $failedContainers.")
+  val detail = status.getExitStatus match {
+case -103 => "Vmem limit exceeded";
+case -104 => "Pmem limit exceeded";
+case _ => ""
+  }
+  messageListener foreach {
+_ ! decorateMessage(
+  YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
+s"state=${status.getState}.\n${status.getDiagnostics} $detail")
+)
+  }
+  }
+
+  runningContainersList = remainingRunningContainers
+
+  // maxFailedContainers == -1 is infinite number of retries.
+  if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
+val msg = s"Stopping YARN session because the number of failed " +
+  s"containers ($failedContainers) exceeded the maximum failed 
container " +
+  s"count ($maxFailedContainers). This number is controlled by " +
+  s"the 

flink git commit: [hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema with more efficient reusable buffers.

2016-02-01 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 67b380d61 -> 92efcd34a


[hotfix] Remove 'ByteArrayInputView' and replace deserialization in 
TypeInformationSerializationSchema with more efficient reusable buffers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92efcd34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92efcd34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92efcd34

Branch: refs/heads/master
Commit: 92efcd34a5da2bccb07666f2c647974ea3e7c94f
Parents: 67b380d
Author: Stephan Ewen 
Authored: Mon Feb 1 14:39:24 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 17:29:02 2016 +0100

--
 .../typeutils/runtime/ByteArrayInputView.java   | 40 
 .../runtime/kryo/KryoClearedBufferTest.java |  8 +++-
 .../runtime/util/DataInputDeserializer.java | 48 
 .../runtime/util/DataOutputSerializer.java  | 25 ++
 ...eInformationKeyValueSerializationSchema.java | 44 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 17 +--
 .../TypeInformationSerializationSchema.java | 14 --
 .../TypeInformationSerializationSchemaTest.java |  2 +-
 8 files changed, 109 insertions(+), 89 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
deleted file mode 100644
index 48d6a3d..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-public class ByteArrayInputView extends DataInputStream implements 
DataInputView {
-
-   public ByteArrayInputView(byte[] data) {
-   super(new ByteArrayInputStream(data));
-   }
-
-   @Override
-   public void skipBytesToRead(int numBytes) throws IOException {
-   while (numBytes > 0) {
-   int skipped = skipBytes(numBytes);
-   numBytes -= skipped;
-   }
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index ab2e45f..7572408 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -22,13 +22,16 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -69,7 +72,8 @@ public class KryoClearedBufferTest {

flink git commit: [fix] [docs] Fix typo in savepoints documentation

2016-02-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 92efcd34a -> 6c0a83e4f


[fix] [docs] Fix typo in savepoints documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c0a83e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c0a83e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c0a83e4

Branch: refs/heads/master
Commit: 6c0a83e4fa1ef3d68df31bf01618972c4a445b21
Parents: 92efcd3
Author: Till Rohrmann 
Authored: Mon Feb 1 18:16:07 2016 +0100
Committer: Till Rohrmann 
Committed: Mon Feb 1 18:16:07 2016 +0100

--
 docs/apis/streaming/savepoints.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c0a83e4/docs/apis/streaming/savepoints.md
--
diff --git a/docs/apis/streaming/savepoints.md 
b/docs/apis/streaming/savepoints.md
index 910f503..f04845c 100644
--- a/docs/apis/streaming/savepoints.md
+++ b/docs/apis/streaming/savepoints.md
@@ -79,7 +79,7 @@ For savepoints **only stateful tasks matter**. In the above 
example, the source
 
 Each task is identified by its **generated task IDs** and **subtask index**. 
In the above example the state of the source (**s1**, 
**s2**) and map tasks (**m1**, **m2**) is 
identified by their respective task ID (*0xC322EC* for the source tasks and 
*0x27B3EF* for the map tasks) and subtask index. There is no state for the 
sinks (**t1**, **t2**). Their IDs therefore do not matter.
 
-Important The IDs are generated 
**deterministically** from your program structure. This means that as long as 
your program does not change, the IDs do not change. **The only allowed changes 
are within the user function, e.g. you can change the implemented `MapFunction` 
without changing the typology**. In this case, it is straight forward to 
restore the state from a savepoint by mapping it back to the same task IDs and 
subtask indexes. This allows you to work with savepoints out of the box, but 
gets problematic as soon as you make changes to the topology, because they 
result in changed IDs and the savepoint state cannot be mapped to your program 
any more.
+Important The IDs are generated 
**deterministically** from your program structure. This means that as long as 
your program does not change, the IDs do not change. **The only allowed changes 
are within the user function, e.g. you can change the implemented `MapFunction` 
without changing the topology**. In this case, it is straight forward to 
restore the state from a savepoint by mapping it back to the same task IDs and 
subtask indexes. This allows you to work with savepoints out of the box, but 
gets problematic as soon as you make changes to the topology, because they 
result in changed IDs and the savepoint state cannot be mapped to your program 
any more.
 
 Recommended In order to be able to 
change your program and **have fixed IDs**, the *DataStream* API provides a 
method to manually specify the task IDs. Each operator provides a 
**`uid(String)`** method to override the generated ID. The ID is a String, 
which will be deterministically hashed to a 16-byte hash value. It is 
**important** that the specified IDs are **unique per transformation and job**. 
If this is not the case, job submission will fail.
 



flink git commit: [hotfix] fix non-exhaustive match warning

2016-02-01 Thread mxm
Repository: flink
Updated Branches:
  refs/heads/master 2a49eaaf3 -> ef58cf302


[hotfix] fix non-exhaustive match warning


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef58cf30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef58cf30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef58cf30

Branch: refs/heads/master
Commit: ef58cf30291f49a5037248cbf10a0d9acd67b09e
Parents: 2a49eaa
Author: Maximilian Michels 
Authored: Mon Feb 1 12:52:26 2016 +0100
Committer: Maximilian Michels 
Committed: Mon Feb 1 12:54:13 2016 +0100

--
 .../main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ef58cf30/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index dc17742..df7e6c6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1692,6 +1692,7 @@ object JobManager {
 
 result match {
   case scala.util.Failure(f) => throw f
+  case _ =>
 }
   }
 



flink git commit: [FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet

2016-02-01 Thread chengxiang
Repository: flink
Updated Branches:
  refs/heads/master 6c0a83e4f -> 233c0147b


[FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233c0147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233c0147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233c0147

Branch: refs/heads/master
Commit: 233c0147b1e9c18034d37f803e8974365a5a7d86
Parents: 6c0a83e
Author: chengxiang li 
Authored: Tue Feb 2 14:09:52 2016 +0800
Committer: chengxiang li 
Committed: Tue Feb 2 14:09:52 2016 +0800

--
 .../operators/udf/RangeBoundaryBuilder.java | 16 +-
 .../api/scala/operators/PartitionITCase.scala   | 32 
 2 files changed, 41 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
index cd163d3..09bd42d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java
@@ -60,13 +60,15 @@ public class RangeBoundaryBuilder extends 
RichMapPartitionFunction 0) {
+   double avgRange = sampledData.size() / (double) 
parallelism;
+   int numKey = comparator.getFlatComparators().length;
+   for (int i = 1; i < parallelism; i++) {
+   T record = sampledData.get((int) (i * 
avgRange));
+   Object[] keys = new Object[numKey];
+   comparator.extractKeys(record, keys, 0);
+   boundaries[i-1] = keys;
+   }
}
 
out.collect(boundaries);

http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
--
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index ca8bcd9..df13cd4 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -50,6 +50,38 @@ class PartitionITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(
   }
 
   @Test
+  def testEmptyHashPartition(): Unit = {
+/*
+ * Test hash partition by tuple field
+ */
+val env = ExecutionEnvironment.getExecutionEnvironment
+val ds = env.fromCollection(Seq[Tuple1[String]]())
+
+val unique = ds.partitionByHash(0)
+
+unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+env.execute()
+
+expected = ""
+  }
+
+  @Test
+  def testEmptyRangePartition(): Unit = {
+/*
+ * Test hash partition by tuple field
+ */
+val env = ExecutionEnvironment.getExecutionEnvironment
+val ds = env.fromCollection(Seq[Tuple1[String]]())
+
+val unique = ds.partitionByRange(0)
+
+unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+env.execute()
+
+expected = ""
+  }
+
+  @Test
   def testHashPartitionByTupleField(): Unit = {
 /*
  * Test hash partition by tuple field



[1/5] flink git commit: [FLINK-3306] [core] Fix auto-type registry util

2016-02-01 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master ef58cf302 -> 5e2fb3cb5


[FLINK-3306] [core] Fix auto-type registry util


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bc47af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bc47af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bc47af

Branch: refs/heads/master
Commit: c4bc47afa6147dd25d8b579e764b30c9c13ee8ea
Parents: d902d16
Author: Stephan Ewen 
Authored: Fri Jan 29 17:08:32 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 14:45:55 2016 +0100

--
 .../api/io/avro/AvroRecordInputFormatTest.java  | 115 -
 .../flink/api/java/ExecutionEnvironment.java|  51 +++-
 .../flink/api/java/typeutils/TypeExtractor.java |   5 +
 .../typeutils/runtime/kryo/Serializers.java | 128 +++
 .../kryo/KryoGenericTypeSerializerTest.java |  31 ++---
 .../typeutils/runtime/kryo/SerializersTest.java |  61 +++--
 6 files changed, 217 insertions(+), 174 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
--
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index c04435c..42cbebe 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -22,10 +22,7 @@ import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
@@ -219,41 +216,43 @@ public class AvroRecordInputFormatTest {
 */
@Test
public void testDeserializeToGenericType() throws IOException {
-   DatumReader datumReader = new 
GenericDatumReader(userSchema);
-
-   FileReader dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
-   // initialize Record by reading it from disk (thats easier than 
creating it by hand)
-   GenericData.Record rec = new GenericData.Record(userSchema);
-   dataFileReader.next(rec);
-   // check if record has been read correctly
-   assertNotNull(rec);
-   assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
-   assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
-   assertEquals(null, rec.get("type_long_test")); // it is null 
for the first record.
-
-   // now serialize it with our framework:
-
-   TypeInformation te = 
(TypeInformation) 
TypeExtractor.createTypeInfo(GenericData.Record.class);
-   ExecutionConfig ec = new ExecutionConfig();
-   Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-   Serializers.recursivelyRegisterType(( (GenericTypeInfo) 
te).getTypeClass(), ec);
-
-   TypeSerializer tser = 
te.createSerializer(ec);
-   Assert.assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
-   Assert.assertTrue(
-   
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-   
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-   ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
-   tser.serialize(rec, target);
-
-   GenericData.Record newRec = 
tser.deserialize(target.getInputView());
-
-   // check if it is still the same
-   assertNotNull(newRec);
-   assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.get("type_enum").toString());
-   assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString() );
-   assertEquals(null, newRec.get("type_long_test"));
-
+   DatumReader datumReader = new 
GenericDatumReader<>(userSchema);
+
+   try (FileReader dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
+   // initialize Record by reading it from disk (thats 
easier than creating it by hand)
+   GenericData.Record rec = new 

[4/5] flink git commit: [FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time

2016-02-01 Thread sewen
[FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time

The auto-magic for Joda Time was limited to very few classes. It was 
intransparent what
cases would be handled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6110dc3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6110dc3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6110dc3

Branch: refs/heads/master
Commit: b6110dc35a17340653a39209038041a5e28054b4
Parents: c4bc47a
Author: Stephan Ewen 
Authored: Fri Jan 29 18:51:03 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 14:46:06 2016 +0100

--
 flink-java/pom.xml  | 30 +++---
 .../flink/api/java/ExecutionEnvironment.java| 26 
 .../typeutils/runtime/kryo/Serializers.java | 42 +++-
 .../api/operators/TimestampedCollector.java | 12 +++---
 .../runtime/KryoGenericTypeSerializerTest.scala | 37 +++--
 .../api/scala/runtime/TupleSerializerTest.scala | 29 --
 6 files changed, 68 insertions(+), 108 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/pom.xml
--
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 8383a4a..a31e89d 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -70,22 +70,6 @@ under the License.

 

-   de.javakaffee
-   kryo-serializers
-   0.27
-   
-
-   
-   joda-time
-   joda-time
-   
-
-   
-   org.joda
-   joda-convert
-   
-
-   
com.google.guava
guava
${guava.version}
@@ -104,6 +88,20 @@ under the License.
test-jar
test

+
+   
+   joda-time
+   joda-time
+   2.5
+   test
+   
+
+   
+   org.joda
+   joda-convert
+   1.7
+   test
+   


 

http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 10cb5e3..253ffa3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.java;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
+import com.esotericsoftware.kryo.Serializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -46,7 +47,10 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.*;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -54,14 +58,22 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
 import org.apache.flink.util.Visitor;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Serializer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import 

[5/5] flink git commit: [hotfix] Add deprecation message to old Key interface

2016-02-01 Thread sewen
[hotfix] Add deprecation message to old Key interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e2fb3cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e2fb3cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e2fb3cb

Branch: refs/heads/master
Commit: 5e2fb3cb5eb0ab181d62e864bb0443d656de2f83
Parents: b6110dc
Author: Stephan Ewen 
Authored: Sun Jan 31 23:19:12 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 14:46:06 2016 +0100

--
 flink-core/src/main/java/org/apache/flink/types/Key.java | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5e2fb3cb/flink-core/src/main/java/org/apache/flink/types/Key.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/types/Key.java 
b/flink-core/src/main/java/org/apache/flink/types/Key.java
index ceeeb57..cc1978c 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Key.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Key.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
 /**
@@ -31,7 +30,11 @@ package org.apache.flink.types;
  * @see org.apache.flink.types.Value
  * @see org.apache.flink.core.io.IOReadableWritable
  * @see java.lang.Comparable
+ * 
+ * @deprecated The Key type is a relict of a deprecated and removed API and 
will be removed
+ * in future versions as well.
  */
+@Deprecated
 public interface Key extends Value, Comparable {

/**



[2/5] flink git commit: [hotfix] Clean up warnings in Serializers util class

2016-02-01 Thread sewen
[hotfix] Clean up warnings in Serializers util class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d902d164
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d902d164
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d902d164

Branch: refs/heads/master
Commit: d902d164556e631d1c8edea475515901691c639c
Parents: 8c8f1c4
Author: Stephan Ewen 
Authored: Fri Jan 29 16:29:31 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 14:45:55 2016 +0100

--
 .../typeutils/runtime/kryo/Serializers.java | 44 +---
 1 file changed, 30 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d902d164/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 76f8eb4..0ea8691 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -60,48 +61,51 @@ public class Serializers {
 * to Kryo.
 * It also watches for types which need special serializers.
 */
-   private static Set alreadySeen = new HashSet();
+   private static Set alreadySeen = new HashSet<>();
 
public static void recursivelyRegisterType(Class type, 
ExecutionConfig config) {
alreadySeen.add(type);
-   if(type.isPrimitive()) {
+   
+   if (type.isPrimitive()) {
return;
}
config.registerKryoType(type);
addSerializerForType(config, type);
 
Field[] fields = type.getDeclaredFields();
-   for(Field field : fields) {
+   for (Field field : fields) {
if(Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
continue;
}
Type fieldType = field.getGenericType();
-   if(fieldType instanceof ParameterizedType) { // field 
has generics
+   if (fieldType instanceof ParameterizedType) { // field 
has generics
ParameterizedType parameterizedFieldType = 
(ParameterizedType) fieldType;
-   for(Type t: 
parameterizedFieldType.getActualTypeArguments()) {
-   if(TypeExtractor.isClassType(t) ) {
-   Class clazz = 
TypeExtractor.typeToClass(t);
-   
if(!alreadySeen.contains(clazz)) {
+   for (Type t: 
parameterizedFieldType.getActualTypeArguments()) {
+   if (TypeExtractor.isClassType(t) ) {
+   Class clazz = 
TypeExtractor.typeToClass(t);
+   if 
(!alreadySeen.contains(clazz)) {

recursivelyRegisterType(TypeExtractor.typeToClass(t), config);
}
}
}
}
Class clazz = field.getType();
-   if(!alreadySeen.contains(clazz)) {
+   if (!alreadySeen.contains(clazz)) {
recursivelyRegisterType(clazz, config);
}
}
}
 
public static void addSerializerForType(ExecutionConfig reg, Class 
type) {
-   if(GenericData.Record.class.isAssignableFrom(type)) {
+   if (GenericData.Record.class.isAssignableFrom(type)) {
registerGenericAvro(reg);
}
-   if(SpecificRecordBase.class.isAssignableFrom(type)) {
-   registerSpecificAvro(reg, (Class) type);
+   if (SpecificRecordBase.class.isAssignableFrom(type)) {
+   @SuppressWarnings("unchecked")
+   Class specRecordClass = 
(Class) type;
+   

[3/5] flink git commit: [hotfix] [streaming] Tasks print subtask in log statements

2016-02-01 Thread sewen
[hotfix] [streaming] Tasks print subtask in log statements


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c8f1c45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c8f1c45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c8f1c45

Branch: refs/heads/master
Commit: 8c8f1c45face7105c9b60bcfd3f818eea19d42d1
Parents: ef58cf3
Author: Stephan Ewen 
Authored: Thu Jan 28 16:32:17 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 1 14:45:55 2016 +0100

--
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8c8f1c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b91c570..e4b6b6e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -379,7 +379,7 @@ public abstract class StreamTask
 * @return The name of the task.
 */
public String getName() {
-   return getEnvironment().getTaskInfo().getTaskName();
+   return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
}
 
/**



[2/2] flink git commit: [FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior

2016-02-01 Thread mxm
[FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior

The methods snapshotState and notifyCheckpointComplete are always
mutually exclusive. The RMQSource relies on this but the test makes a
false assumption when it calls those two methods at the same time.

This closes #1569.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67b380d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67b380d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67b380d6

Branch: refs/heads/master
Commit: 67b380d617a942c11ab29a8d62d67b770245bb63
Parents: 83e6a2b
Author: Maximilian Michels 
Authored: Mon Feb 1 16:30:52 2016 +0100
Committer: Maximilian Michels 
Committed: Mon Feb 1 16:51:35 2016 +0100

--
 .../flink/streaming/connectors/rabbitmq/RMQSourceTest.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/67b380d6/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index aa19e5d..e0eed70 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -140,7 +140,9 @@ public class RMQSourceTest {
}
 
// check if the messages are being acknowledged and the 
transaction comitted
-   source.notifyCheckpointComplete(snapshotId);
+   synchronized (DummySourceContext.lock) {
+   source.notifyCheckpointComplete(snapshotId);
+   }
totalNumberOfAcks += numIds;
 
}



[1/2] flink git commit: Revert "[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ"

2016-02-01 Thread mxm
Repository: flink
Updated Branches:
  refs/heads/master 5e2fb3cb5 -> 67b380d61


Revert "[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ"

This reverts commit 6b01a89020f2de3f7710cf72336291b1e8ca8562. The
introduced locks are not necessary. The checkpointing test case only
needs to be adapted to the checkpointing runtime behavior.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83e6a2b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83e6a2b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83e6a2b5

Branch: refs/heads/master
Commit: 83e6a2b5f7621a2bfd8797ab071a94492bbac3d5
Parents: 5e2fb3c
Author: Maximilian Michels 
Authored: Mon Feb 1 16:28:07 2016 +0100
Committer: Maximilian Michels 
Committed: Mon Feb 1 16:50:48 2016 +0100

--
 .../connectors/rabbitmq/RMQSource.java  |  4 +-
 .../connectors/rabbitmq/RMQSourceTest.java  | 79 
 .../source/MessageAcknowledgingSourceBase.java  | 51 ++---
 ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++---
 4 files changed, 34 insertions(+), 124 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/83e6a2b5/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 59bc057..09bb07c 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -196,9 +196,7 @@ public class RMQSource extends 
MultipleIdsMessageAcknowledgingSourceBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/83e6a2b5/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 0a3de84..aa19e5d 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -23,7 +23,6 @@ import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -32,7 +31,6 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -105,83 +103,6 @@ public class RMQSourceTest {
sourceThread.join();
}
 
-   /**
-* Make sure concurrent access to snapshotState() and 
notifyCheckpointComplete() don't cause
-* an issue.
-*
-* Without proper synchronization, the test will fail with a concurrent 
modification exception
-*
-*/
-   @Test
-   public void testConcurrentAccess() throws Exception {
-   source.autoAck = false;
-   sourceThread.start();
-
-   final Tuple1 error = new Tuple1<>(null);
-
-   Thread.sleep(5);
-
-   Thread snapshotThread = new Thread(new Runnable() {
-   public long id = 0;
-
-   @Override
-   public void run() {
-   while (!Thread.interrupted()) {
-   try {
-   source.snapshotState(id++, 0);
-   } catch (Exception e) {
-