buildbot success in on flink-docs-release-0.9

2016-04-13 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.9 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.9/builds/294

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: lares_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' 
triggered this build
Build Source Stamp: [branch release-0.9] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-release-0.10

2016-04-13 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/179

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: lares_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





[3/5] flink git commit: [FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.

2016-04-13 Thread sewen
[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4

Branch: refs/heads/master
Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1
Parents: e40e29d
Author: Stephan Ewen 
Authored: Wed Apr 13 15:45:51 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 20:50:49 2016 +0200

--
 .../kafka/internals/AbstractFetcher.java|  77 +++--
 .../kafka/internals/ExceptionProxy.java |  60 +++-
 .../connectors/kafka/util/KafkaUtils.java   |  33 +-
 .../AbstractFetcherTimestampsTest.java  | 306 +++
 .../kafka/testutils/MockRuntimeContext.java |  46 ++-
 5 files changed, 478 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 594aa66..8183575 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -328,45 +328,66 @@ public abstract class AbstractFetcher {
ClassLoader userCodeClassLoader)
throws IOException, ClassNotFoundException
{
-   @SuppressWarnings("unchecked")
-   KafkaTopicPartitionState[] partitions =
-   (KafkaTopicPartitionState[]) new 
KafkaTopicPartitionState[assignedPartitions.size()];
-
-   int pos = 0;
-   for (KafkaTopicPartition partition : assignedPartitions) {
-   // create the kafka version specific partition handle
-   KPH kafkaHandle = createKafkaPartitionHandle(partition);
+   switch (timestampWatermarkMode) {

-   // create the partition state
-   KafkaTopicPartitionState partitionState;
-   switch (timestampWatermarkMode) {
-   case NO_TIMESTAMPS_WATERMARKS:
-   partitionState = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
-   break;
-   case PERIODIC_WATERMARKS: {
+   case NO_TIMESTAMPS_WATERMARKS: {
+   @SuppressWarnings("unchecked")
+   KafkaTopicPartitionState[] partitions =
+   
(KafkaTopicPartitionState[]) new 
KafkaTopicPartitionState[assignedPartitions.size()];
+
+   int pos = 0;
+   for (KafkaTopicPartition partition : 
assignedPartitions) {
+   // create the kafka version specific 
partition handle
+   KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+   partitions[pos++] = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
+   }
+
+   return partitions;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   @SuppressWarnings("unchecked")
+   
KafkaTopicPartitionStateWithPeriodicWatermarks[] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks[])
+   new 
KafkaTopicPartitionStateWithPeriodicWatermarks[assignedPartitions.size()];
+
+   int pos = 0;
+   for (KafkaTopicPartition partition : 
assignedPartitions) {
+   KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+
AssignerWithPeriodicWatermarks 
assignerInstance =


[1/5] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability

2016-04-13 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master b0a7a1b81 -> 1a34f2165


[hotfix] [kafka consumer] Increase Kafka test stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924

Branch: refs/heads/master
Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156
Parents: b0a7a1b
Author: Stephan Ewen 
Authored: Wed Apr 13 10:38:37 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 20:50:48 2016 +0200

--
 .../connectors/kafka/KafkaConsumerTestBase.java| 13 -
 1 file changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ff67b4..a65a411 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
new 
Tuple2Partitioner(parallelism)))
.setParallelism(parallelism);
 
-   writeEnv.execute("Write sequence");
+   try {
+   writeEnv.execute("Write sequence");
+   }
+   catch (Exception e) {
+   LOG.error("Write attempt failed, trying again", 
e);
+   deleteTestTopic(topicName);
+   
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+   continue;
+   }
+   
LOG.info("Finished writing sequence");
 
//  Validate the Sequence 

// we need to validate the sequence, because kafka's 
producers are not exactly once
LOG.info("Validating sequence");
+
+   
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));

final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);

readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());



[4/5] flink git commit: [FLINK-3743] upgrading breeze from 0.11.2 to 0.12

2016-04-13 Thread sewen
[FLINK-3743] upgrading breeze from 0.11.2 to 0.12

This closes #1876


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a34f216
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a34f216
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a34f216

Branch: refs/heads/master
Commit: 1a34f2165b217a4579df410dd7ccb879d6a58083
Parents: 2dcd27f
Author: Todd Lisonbee 
Authored: Mon Apr 11 23:20:43 2016 -0700
Committer: Stephan Ewen 
Committed: Wed Apr 13 20:50:49 2016 +0200

--
 flink-libraries/flink-ml/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1a34f216/flink-libraries/flink-ml/pom.xml
--
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index 986cb0f..0a3531e 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -44,7 +44,7 @@

org.scalanlp
breeze_${scala.binary.version}
-   0.11.2
+   0.12

 




[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of stream sources

2016-04-13 Thread sewen
[FLINK-3745] [runtime] Fix early stopping of stream sources


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc

Branch: refs/heads/master
Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992
Parents: 2728f92
Author: Stephan Ewen 
Authored: Wed Apr 13 12:26:42 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 20:50:49 2016 +0200

--
 .../tasks/StoppableSourceStreamTask.java| 15 +++-
 .../tasks/SourceStreamTaskStoppingTest.java | 94 
 .../runtime/tasks/SourceStreamTaskTest.java | 40 +
 .../streaming/timestamp/TimestampITCase.java| 18 ++--
 .../test/classloading/jar/UserCodeType.java |  1 +
 5 files changed, 123 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 5173796..7ff39b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -31,9 +31,20 @@ import 
org.apache.flink.streaming.api.operators.StoppableStreamSource;
 public class StoppableSourceStreamTask
extends SourceStreamTask> 
implements StoppableTask {
 
+   private volatile boolean stopped;
+
@Override
-   public void stop() {
-   this.headOperator.stop();
+   protected void run() throws Exception {
+   if (!stopped) {
+   super.run();
+   }
}
 
+   @Override
+   public void stop() {
+   stopped = true;
+   if (this.headOperator != null) {
+   this.headOperator.stop();
+   }
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
new file mode 100644
index 000..ab9e59b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct 
order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
+public class SourceStreamTaskStoppingTest {
+
+
+   // test flag for testStop()
+   static boolean stopped = false;
+
+   @Test
+   public void testStop() {
+   final StoppableSourceStreamTask 
sourceTask = new StoppableSourceStreamTask<>();
+   sourceTask.headOperator = new 

[2/5] flink git commit: [FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer

2016-04-13 Thread sewen
[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer

This closes #1877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da

Branch: refs/heads/master
Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530
Parents: 8570b6d
Author: Aljoscha Krettek 
Authored: Wed Apr 13 11:41:39 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 20:50:49 2016 +0200

--
 .../connectors/kafka/FlinkKafkaConsumerBase.java|  4 ++--
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java| 12 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0ca8fd5..ed5c72f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase extends 
RichParallelSourceFuncti
 * @param assigner The timestamp assigner / watermark generator to use.
 * @return The consumer object, to allow function chaining.   
 */
-   public FlinkKafkaConsumerBase 
setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks assigner) {
+   public FlinkKafkaConsumerBase 
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks assigner) {
checkNotNull(assigner);

if (this.periodicWatermarkAssigner != null) {
@@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase extends 
RichParallelSourceFuncti
 * @param assigner The timestamp assigner / watermark generator to use.
 * @return The consumer object, to allow function chaining.   
 */
-   public FlinkKafkaConsumerBase 
setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks assigner) {
+   public FlinkKafkaConsumerBase 
assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks assigner) {
checkNotNull(assigner);

if (this.punctuatedWatermarkAssigner != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f4ef995..9b517df 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest {
@Test
public void testEitherWatermarkExtractor() {
try {
-   new 
DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+   new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)
 null);
fail();
} catch (NullPointerException ignored) {}
 
try {
-   new 
DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+   new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)
 null);
fail();
} catch (NullPointerException ignored) {}

@@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest {
final AssignerWithPunctuatedWatermarks 
punctuatedAssigner = 

[2/2] flink git commit: [FLINK-2909] [gelly] Graph Generators

2016-04-13 Thread greg
[FLINK-2909] [gelly] Graph Generators

Initial set of scale-free graph generators:
- Complete graph
- Cycle graph
- Empty graph
- Grid graph
- Hypercube graph
- Path graph
- RMat graph
- Singleton edge graph
- Star graph

This closes #1807


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7a1b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7a1b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7a1b8

Branch: refs/heads/master
Commit: b0a7a1b813b29dbfd5c959ee2929372c31befda8
Parents: 5350bc4
Author: Greg Hogan 
Authored: Tue Mar 15 16:59:02 2016 -0400
Committer: EC2 Default User 
Committed: Wed Apr 13 18:25:29 2016 +

--
 docs/apis/batch/libs/gelly.md   | 644 +++
 docs/page/css/flink.css |  21 +
 .../flink/util/LongValueSequenceIterator.java   | 193 ++
 .../util/LongValueSequenceIteratorTest.java |  92 +++
 .../java/org/apache/flink/api/java/Utils.java   |   2 +-
 .../apache/flink/graph/examples/Graph500.java   | 121 
 .../graph/generator/AbstractGraphGenerator.java |  33 +
 .../flink/graph/generator/CompleteGraph.java| 112 
 .../flink/graph/generator/CycleGraph.java   |  60 ++
 .../flink/graph/generator/EmptyGraph.java   |  79 +++
 .../flink/graph/generator/GraphGenerator.java   |  51 ++
 .../graph/generator/GraphGeneratorUtils.java| 119 
 .../apache/flink/graph/generator/GridGraph.java | 165 +
 .../flink/graph/generator/HypercubeGraph.java   |  65 ++
 .../apache/flink/graph/generator/PathGraph.java |  60 ++
 .../apache/flink/graph/generator/RMatGraph.java | 316 +
 .../graph/generator/SingletonEdgeGraph.java | 107 +++
 .../apache/flink/graph/generator/StarGraph.java | 100 +++
 .../random/AbstractGeneratorFactory.java|  72 +++
 .../flink/graph/generator/random/BlockInfo.java |  82 +++
 .../random/JDKRandomGeneratorFactory.java   |  72 +++
 .../random/MersenneTwisterFactory.java  |  72 +++
 .../graph/generator/random/RandomGenerable.java |  41 ++
 .../random/RandomGenerableFactory.java  |  57 ++
 .../graph/generator/AbstractGraphTest.java  |  33 +
 .../graph/generator/CompleteGraphTest.java  |  84 +++
 .../flink/graph/generator/CycleGraphTest.java   |  83 +++
 .../flink/graph/generator/EmptyGraphTest.java   |  78 +++
 .../flink/graph/generator/GridGraphTest.java|  93 +++
 .../graph/generator/HypercubeGraphTest.java |  85 +++
 .../flink/graph/generator/PathGraphTest.java|  83 +++
 .../flink/graph/generator/RMatGraphTest.java|  70 ++
 .../graph/generator/SingletonEdgeGraphTest.java |  84 +++
 .../flink/graph/generator/StarGraphTest.java|  85 +++
 .../apache/flink/graph/generator/TestUtils.java | 103 +++
 35 files changed, 3616 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/apis/batch/libs/gelly.md
--
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index d803be2..53d628d 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2012,3 +2012,647 @@ vertex represents a group of vertices and each edge 
represents a group of edges
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
 {% top %}
+
+Graph Generators
+---
+
+Gelly provides a collection of scalable graph generators. Each generator is
+
+* parallelizable, in order to create large datasets
+* scale-free, generating the same graph regardless of parallelism
+* thrifty, using as few operators as possible
+
+Graph generators are configured using the builder pattern. The parallelism of 
generator
+operators can be set explicitly by calling `setParallelism(parallelism)`. 
Lowering the
+parallelism will reduce the allocation of memory and network buffers.
+
+Graph-specific configuration must be called first, then configuration common 
to all
+generators, and lastly the call to `generate()`. The following example 
configures a
+grid graph with two dimensions, configures the parallelism, and generates the 
graph.
+
+
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+boolean wrapEndpoints = false;
+
+int parallelism = 4;
+
+Graph graph = new GridGraph(env)
+.addDimension(2, wrapEndpoints)
+.addDimension(4, wrapEndpoints)
+.setParallelism(parallelism)
+.generate();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.GridGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+

flink git commit: [FLINK-3589] Allow setting Operator parallelism to default value

2016-04-13 Thread greg
Repository: flink
Updated Branches:
  refs/heads/master 6c061684f -> 5350bc48a


[FLINK-3589] Allow setting Operator parallelism to default value

Adds the public field ExecutionConfig.PARALLELISM_DEFAULT as a flag
value to indicate that the default parallelism should be used.

Adds the public field ExecutionConfig.PARALLELISM_UNKNOWN as a flag
value to indicate that the parallelism should remain unchanged.

This closes #1778


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5350bc48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5350bc48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5350bc48

Branch: refs/heads/master
Commit: 5350bc48af37a1d6296de5b290b870b7d68407fa
Parents: 6c06168
Author: Greg Hogan 
Authored: Wed Mar 9 12:41:45 2016 -0500
Committer: Greg Hogan 
Committed: Wed Apr 13 11:59:13 2016 -0400

--
 .../org/apache/flink/client/LocalExecutor.java  |  5 +-
 .../apache/flink/client/cli/ProgramOptions.java |  3 +-
 .../client/program/ContextEnvironment.java  |  3 +-
 .../flink/api/common/ExecutionConfig.java   | 30 ++---
 .../java/org/apache/flink/api/common/Plan.java  | 17 +++---
 .../flink/api/common/operators/Operator.java| 16 ++---
 .../flink/api/common/ExecutionConfigTest.java   | 33 +-
 .../flink/api/java/ExecutionEnvironment.java|  2 +-
 .../apache/flink/api/java/LocalEnvironment.java |  3 +-
 .../flink/api/java/operators/DataSink.java  |  3 +-
 .../api/java/operators/DeltaIteration.java  |  8 ++-
 .../flink/api/java/operators/Operator.java  | 20 +++---
 .../flink/api/java/operator/OperatorTest.java   | 64 
 .../translation/ReduceTranslationTests.java |  5 +-
 .../flink/optimizer/dag/OptimizerNode.java  | 17 +++---
 .../flink/runtime/jobgraph/JobVertex.java   |  3 +-
 .../flink/streaming/api/graph/StreamNode.java   |  3 +-
 .../translation/ReduceTranslationTest.scala |  7 ++-
 18 files changed, 185 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 25da5c7..65bf5d8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -206,7 +207,7 @@ public class LocalExecutor extends PlanExecutor {
 */
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-   final int parallelism = plan.getDefaultParallelism() == -1 ? 1 
: plan.getDefaultParallelism();
+   final int parallelism = plan.getDefaultParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();
 
Optimizer pc = new Optimizer(new DataStatistics(), 
this.configuration);
pc.setDefaultParallelism(parallelism);
@@ -271,7 +272,7 @@ public class LocalExecutor extends PlanExecutor {
 * @throws Exception Thrown, if the optimization process that creates 
the execution plan failed.
 */
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
-   final int parallelism = plan.getDefaultParallelism() == -1 ? 1 
: plan.getDefaultParallelism();
+   final int parallelism = plan.getDefaultParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();
 
Optimizer pc = new Optimizer(new DataStatistics(), new 
Configuration());
pc.setDefaultParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 73d49b5..368ec19 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -18,6 +18,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
+import 

flink git commit: [hotfix] [tests] Add cross transformation to OverwriteObjects manual test

2016-04-13 Thread greg
Repository: flink
Updated Branches:
  refs/heads/master e7d78e63a -> 6c061684f


[hotfix] [tests] Add cross transformation to OverwriteObjects manual test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c061684
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c061684
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c061684

Branch: refs/heads/master
Commit: 6c061684f93ddcd375f37d3ef71f6dd66783df8b
Parents: e7d78e6
Author: Greg Hogan 
Authored: Wed Apr 13 10:48:52 2016 -0400
Committer: Greg Hogan 
Committed: Wed Apr 13 10:51:06 2016 -0400

--
 .../flink/test/manual/OverwriteObjects.java | 67 ++--
 1 file changed, 63 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c061684/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java 
b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 1a70c69..23d8e67 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.manual;
 
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -73,6 +74,7 @@ public class OverwriteObjects {
testReduce(env);
testGroupedReduce(env);
testJoin(env);
+   testCross(env);
}
}
 
@@ -127,7 +129,7 @@ public class OverwriteObjects {
Assert.assertEquals(disabledChecksum, enabledChecksum);
}
 
-   public class OverwriteObjectsReduce implements 
ReduceFunction> {
+   private class OverwriteObjectsReduce implements 
ReduceFunction> {
private Scrambler scrambler;
 
public OverwriteObjectsReduce(boolean keyed) {
@@ -252,7 +254,7 @@ public class OverwriteObjects {
}
}
 
-   public class OverwriteObjectsJoin implements 
JoinFunction, Tuple2, 
Tuple2> {
+   private class OverwriteObjectsJoin implements 
JoinFunction, Tuple2, 
Tuple2> {
private Scrambler scrambler = new Scrambler(true);
 
@Override
@@ -263,12 +265,69 @@ public class OverwriteObjects {
 
// 

 
-   private DataSet> 
getDataSet(ExecutionEnvironment env) {
+   public void testCross(ExecutionEnvironment env) throws Exception {
+   /*
+* Test CrossDriver
+*/
+
+   LOG.info("Testing cross");
+
+   DataSet> small = getDataSet(env, 
100, 20);
+   DataSet> large = getDataSet(env, 
1, 2000);
+
+   // test NESTEDLOOP_BLOCKED_OUTER_FIRST and 
NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse enabled
+
+   env.getConfig().enableObjectReuse();
+
+   ChecksumHashCode enabledChecksumWithHuge = 
DataSetUtils.checksumHashCode(small
+   .crossWithHuge(large)
+   .with(new OverwriteObjectsCross()));
+
+   ChecksumHashCode enabledChecksumWithTiny = 
DataSetUtils.checksumHashCode(small
+   .crossWithTiny(large)
+   .with(new OverwriteObjectsCross()));
+
+   Assert.assertEquals(enabledChecksumWithHuge, 
enabledChecksumWithTiny);
+
+   // test NESTEDLOOP_BLOCKED_OUTER_FIRST and 
NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled
+
+   env.getConfig().disableObjectReuse();
+
+   ChecksumHashCode disabledChecksumWithHuge = 
DataSetUtils.checksumHashCode(small
+   .crossWithHuge(large)
+   .with(new OverwriteObjectsCross()));
+
+   ChecksumHashCode disabledChecksumWithTiny = 
DataSetUtils.checksumHashCode(small
+   .crossWithTiny(large)
+   .with(new OverwriteObjectsCross()));
+
+   Assert.assertEquals(disabledChecksumWithHuge, 
disabledChecksumWithTiny);
+
+ 

flink git commit: [hotfix] [storm] Add proper exception forwarding to FlinkTopology.copyObject

2016-04-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 8e036c38e -> e7d78e63a


[hotfix] [storm] Add proper exception forwarding to FlinkTopology.copyObject


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7d78e63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7d78e63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7d78e63

Branch: refs/heads/master
Commit: e7d78e63ae8e0fc49ad62a77dc4dd90afde85d49
Parents: 8e036c3
Author: Till Rohrmann 
Authored: Wed Apr 13 14:55:35 2016 +0200
Committer: Till Rohrmann 
Committed: Wed Apr 13 14:56:17 2016 +0200

--
 .../src/main/java/org/apache/flink/storm/api/FlinkTopology.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e7d78e63/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
--
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
index 6706a91..2546f17 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -142,7 +142,7 @@ public class FlinkTopology {
getClass().getClassLoader()
);
} catch (IOException | ClassNotFoundException e) {
-   throw new RuntimeException("Failed to copy object.");
+   throw new RuntimeException("Failed to copy object.", e);
}
}
 



flink git commit: [FLINK-3735] Make DataSetUnionRule match only for union-all

2016-04-13 Thread vasia
Repository: flink
Updated Branches:
  refs/heads/master db6528be0 -> 8e036c38e


[FLINK-3735] Make DataSetUnionRule match only for union-all

This closes #1874


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e036c38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e036c38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e036c38

Branch: refs/heads/master
Commit: 8e036c38e3fb7e48aa91a09debab3be6a24deefe
Parents: db6528b
Author: vasia 
Authored: Tue Apr 12 15:11:54 2016 +0200
Committer: vasia 
Committed: Wed Apr 13 12:36:04 2016 +0200

--
 .../plan/rules/dataSet/DataSetUnionRule.scala   | 34 
 1 file changed, 21 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8e036c38/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
index 32400d0..cd1de1e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalUnion
@@ -32,21 +32,29 @@ class DataSetUnionRule
   "FlinkUnionRule")
   {
 
-def convert(rel: RelNode): RelNode = {
+  /**
+   * Only translate UNION ALL
+   */
+  override def matches(call: RelOptRuleCall): Boolean = {
+val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
+union.all
+  }
+
+  def convert(rel: RelNode): RelNode = {
 
-  val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-  val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-  val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataSetConvention.INSTANCE)
-  val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataSetConvention.INSTANCE)
+val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataSetConvention.INSTANCE)
+val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataSetConvention.INSTANCE)
 
-  new DataSetUnion(
-rel.getCluster,
-traitSet,
-convLeft,
-convRight,
-rel.getRowType)
-}
+new DataSetUnion(
+  rel.getCluster,
+  traitSet,
+  convLeft,
+  convRight,
+  rel.getRowType)
   }
+}
 
 object DataSetUnionRule {
   val INSTANCE: RelOptRule = new DataSetUnionRule



flink git commit: [FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout

2016-04-13 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.0 ff38202ba -> ea50ed348


[FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() 
will pass before JUnit timeout

This closes #1864


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea50ed34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea50ed34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea50ed34

Branch: refs/heads/release-1.0
Commit: ea50ed348ccd34a343ebc501f989593253179774
Parents: ff38202
Author: Todd Lisonbee 
Authored: Thu Apr 7 17:47:19 2016 -0700
Committer: Stephan Ewen 
Committed: Wed Apr 13 10:48:39 2016 +0200

--
 .../flink/streaming/connectors/kafka/KafkaConsumerTestBase.java | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ea50ed34/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 9377cee..f31e4f1 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -147,6 +147,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
properties.setProperty("zookeeper.connect", 
"localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000"); 
// let the test fail fast
+   properties.setProperty("socket.timeout.ms", "3000");
properties.setProperty("session.timeout.ms", "2000");
properties.setProperty("fetch.max.wait.ms", "2000");
properties.setProperty("heartbeat.interval.ms", "1000");



flink git commit: [FLINK-3730] Fix RocksDB Local Directory Initialization

2016-04-13 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.0 4f9c19808 -> ff38202ba


[FLINK-3730] Fix RocksDB Local Directory Initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff38202b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff38202b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff38202b

Branch: refs/heads/release-1.0
Commit: ff38202bacfbe07e91bc1b46d44e5eb46e991e79
Parents: 4f9c198
Author: Aljoscha Krettek 
Authored: Tue Apr 12 10:46:59 2016 +0200
Committer: Aljoscha Krettek 
Committed: Wed Apr 13 10:38:11 2016 +0200

--
 .../contrib/streaming/state/RocksDBStateBackend.java  |  9 ++---
 .../streaming/state/RocksDBStateBackendConfigTest.java| 10 +-
 2 files changed, 15 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index e3b4f4d..8f846da 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
@@ -181,13 +182,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {

for (Path path : configuredDbBasePaths) {
File f = new File(path.toUri().getPath());
-   if (!f.exists() && !f.mkdirs()) {
-   String msg = "Local DB files directory 
'" + f.getAbsolutePath()
+   File testDir = new File(f, 
UUID.randomUUID().toString());
+   if (!testDir.mkdirs()) {
+   String msg = "Local DB files directory 
'" + path
+ "' does not exist and 
cannot be created. ";
LOG.error(msg);
errorMessage += msg;
+   } else {
+   dirs.add(f);
}
-   dirs.add(f);
}

if (dirs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8e0993b..42ba275 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest {

RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);

rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-   
+
+   

flink git commit: [FLINK-3730] Fix RocksDB Local Directory Initialization

2016-04-13 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master db85f3858 -> db6528be0


[FLINK-3730] Fix RocksDB Local Directory Initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db6528be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db6528be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db6528be

Branch: refs/heads/master
Commit: db6528be00d624d6a389d5b303e565ca6b1c0f40
Parents: db85f38
Author: Aljoscha Krettek 
Authored: Tue Apr 12 10:46:59 2016 +0200
Committer: Aljoscha Krettek 
Committed: Wed Apr 13 10:36:18 2016 +0200

--
 .../contrib/streaming/state/RocksDBStateBackend.java  |  9 ++---
 .../streaming/state/RocksDBStateBackendConfigTest.java| 10 +-
 2 files changed, 15 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index e3b4f4d..8f846da 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
@@ -181,13 +182,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {

for (Path path : configuredDbBasePaths) {
File f = new File(path.toUri().getPath());
-   if (!f.exists() && !f.mkdirs()) {
-   String msg = "Local DB files directory 
'" + f.getAbsolutePath()
+   File testDir = new File(f, 
UUID.randomUUID().toString());
+   if (!testDir.mkdirs()) {
+   String msg = "Local DB files directory 
'" + path
+ "' does not exist and 
cannot be created. ";
LOG.error(msg);
errorMessage += msg;
+   } else {
+   dirs.add(f);
}
-   dirs.add(f);
}

if (dirs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8e0993b..42ba275 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest {

RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);

rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-   
+
+   boolean 

[06/14] flink git commit: [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09

2016-04-13 Thread sewen
[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09

This closes #1846


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/693d5ab0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/693d5ab0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/693d5ab0

Branch: refs/heads/master
Commit: 693d5ab09efef8732b857437bf1089f841b5e864
Parents: d20eda1
Author: Tianji Li 
Authored: Fri Apr 1 00:35:39 2016 -0400
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 30 ++--
 1 file changed, 2 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/693d5ab0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index d34cd2f..bc2904c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -171,43 +171,17 @@ public class FlinkKafkaConsumer09 extends 
FlinkKafkaConsumerBase {
 
// read the partitions that belong to the listed topics
final List partitions = new ArrayList<>();
-   KafkaConsumer consumer = null;
 
-   try {
-   consumer = new KafkaConsumer<>(this.properties);
+   try (KafkaConsumer consumer = new 
KafkaConsumer<>(this.properties)) {
for (final String topic: topics) {
// get partitions for each topic
-   List partitionsForTopic = null;
-   for(int tri = 0; tri < 10; tri++) {
-   LOG.info("Trying to get partitions for 
topic {}", topic);
-   try {
-   partitionsForTopic = 
consumer.partitionsFor(topic);
-   if(partitionsForTopic != null 
&& partitionsForTopic.size() > 0) {
-   break; // it worked
-   }
-   } catch (NullPointerException npe) {
-   // workaround for KAFKA-2880: 
Fetcher.getTopicMetadata NullPointerException when broker cannot be reached
-   // we ignore the NPE.
-   }
-   // create a new consumer
-   consumer.close();
-   try {
-   Thread.sleep(1000);
-   } catch (InterruptedException ignored) 
{}
-   
-   consumer = new 
KafkaConsumer<>(properties);
-   }
+   List partitionsForTopic = 
consumer.partitionsFor(topic);
// for non existing topics, the list might be 
null.
if (partitionsForTopic != null) {

partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
}
}
-   finally {
-   if(consumer != null) {
-   consumer.close();
-   }
-   }
 
if (partitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);



[05/14] flink git commit: [FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to the FlinkKafkaConsumer

2016-04-13 Thread sewen
[FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to 
the FlinkKafkaConsumer

This closes #1839


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac1549e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac1549e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac1549e

Branch: refs/heads/master
Commit: 0ac1549ec58c1737a79e5770a171a8b14bed56dc
Parents: 885d543
Author: kl0u 
Authored: Tue Mar 8 17:35:14 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  56 +--
 .../connectors/kafka/internals/Fetcher.java |  12 +-
 .../kafka/internals/LegacyFetcher.java  |  51 ++-
 .../kafka/internals/ZookeeperOffsetHandler.java |   4 +-
 .../connectors/kafka/Kafka08ITCase.java |  16 +-
 .../connectors/kafka/KafkaConsumerTest.java |  12 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  13 +-
 .../connectors/kafka/Kafka09ITCase.java |  11 +-
 .../kafka/FlinkKafkaConsumerBase.java   | 357 ++-
 .../kafka/internals/KafkaPartitionState.java|  65 
 .../connectors/kafka/KafkaConsumerTestBase.java | 234 +++-
 .../AssignerWithPeriodicWatermarks.java |   2 +-
 12 files changed, 744 insertions(+), 89 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 865fe36..4748781 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
@@ -108,11 +109,6 @@ public class FlinkKafkaConsumer08 extends 
FlinkKafkaConsumerBase {

private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer08.class);
 
-   /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-* and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-   public static final long OFFSET_NOT_SET = -915623761776L;
-
-
/** Configuration key for the number of retries for getting the 
partition info */
public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
 
@@ -252,17 +248,19 @@ public class FlinkKafkaConsumer08 extends 
FlinkKafkaConsumerBase {
this.fetcher = null; // fetcher remains null
return;
}
-   
 
// offset handling
offsetHandler = new ZookeeperOffsetHandler(props);
 
committedOffsets = new HashMap<>();
 
-   Map subscribedPartitionsWithOffsets 
= new HashMap<>(subscribedPartitions.size());
-   // initially load the map with "offset not set"
+   // initially load the map with "offset not set", last max read 
timestamp set to Long.MIN_VALUE
+   // and mark the partition as in-active, until we receive the 
first element
+   Map 
subscribedPartitionsWithOffsets =
+   new HashMap<>(subscribedPartitions.size());
for(KafkaTopicPartition ktp: subscribedPartitions) {
-   subscribedPartitionsWithOffsets.put(ktp, 
FlinkKafkaConsumer08.OFFSET_NOT_SET);
+   subscribedPartitionsWithOffsets.put(ktp,
+   new KafkaPartitionState(ktp.getPartition(), 

[08/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition

2016-04-13 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 000..99c5d69
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * @param  The type of records handled by the watermark generator
+ * @param  The type of the Kafka partition descriptor, which varies 
across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks 
extends KafkaTopicPartitionState {
+   
+   /** The timestamp assigner and watermark generator for the partition */
+   private final AssignerWithPeriodicWatermarks timestampsAndWatermarks;
+   
+   /** The last watermark timestamp generated by this partition */
+   private long partitionWatermark;
+
+   // 

+   
+   public KafkaTopicPartitionStateWithPeriodicWatermarks(
+   KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+   AssignerWithPeriodicWatermarks 
timestampsAndWatermarks)
+   {
+   super(partition, kafkaPartitionHandle);
+   
+   this.timestampsAndWatermarks = timestampsAndWatermarks;
+   this.partitionWatermark = Long.MIN_VALUE;
+   }
+
+   // 

+   
+   public long getTimestampForRecord (T record) {
+   return timestampsAndWatermarks.extractTimestamp(record, 
Long.MIN_VALUE);
+   }
+   
+   public long getCurrentWatermarkTimestamp() {
+   Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+   if (wm != null) {
+   partitionWatermark = Math.max(partitionWatermark, 
wm.getTimestamp());
+   }
+   return partitionWatermark;
+   }
+
+   // 

+   
+   @Override
+   public String toString() {
+   return "KafkaTopicPartitionStateWithPeriodicWatermarks: 
partition=" + getKafkaTopicPartition()
+   + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 000..b265990
--- /dev/null
+++ 

[13/14] flink git commit: [FLINK-3644] [web monitor] Add new config option to set web monitor tmp dir

2016-04-13 Thread sewen
[FLINK-3644] [web monitor] Add new config option to set web monitor tmp dir

This closes #1824


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b188637b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b188637b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b188637b

Branch: refs/heads/master
Commit: b188637b9016116728f0d16f94e213584b2abfd6
Parents: 6bb085e
Author: xueyan.li 
Authored: Tue Apr 5 14:10:48 2016 +0800
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:55 2016 +0200

--
 .../apache/flink/configuration/ConfigConstants.java  |  5 +
 .../flink/runtime/webmonitor/HttpRequestHandler.java | 15 ---
 .../flink/runtime/webmonitor/WebRuntimeMonitor.java  | 10 +-
 .../runtime/webmonitor/WebRuntimeMonitorITCase.java  |  4 ++--
 4 files changed, 20 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ba2d880..53d9a37 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -435,6 +435,11 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_WEB_PORT_KEY = 
"jobmanager.web.port";
 
/**
+* The config parameter defining the flink web directory to be used by 
the webmonitor.
+*/
+   public static final String JOB_MANAGER_WEB_TMPDIR_KEY = 
"jobmanager.web.tmpdir";
+   
+   /**
 * The config parameter defining the number of archived jobs for the 
jobmanager
 */
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = 
"jobmanager.web.history";

http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index c9190c9..bbd29fa 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -69,17 +69,18 @@ public class HttpRequestHandler extends 
SimpleChannelInboundHandler

/** A decoder factory that always stores POST chunks on disk */
private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
-   
-   private static final File TMP_DIR = new 
File(System.getProperty("java.io.tmpdir"));
-   
-   
+
+   private final File tmpDir;
+
private HttpRequest currentRequest;
 
private HttpPostRequestDecoder currentDecoder;
-   
private String currentRequestPath;
 
-
+   public HttpRequestHandler(File tmpDir) {
+   this.tmpDir = tmpDir;
+   }
+   
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
if (currentDecoder != null) {
@@ -130,7 +131,7 @@ public class HttpRequestHandler extends 
SimpleChannelInboundHandler
if (file.isCompleted()) 
{
String name = 
file.getFilename();

-   File target = 
new File(TMP_DIR, UUID.randomUUID() + "_" + name);
+   File target = 
new File(tmpDir, UUID.randomUUID() + "_" + name);

file.renameTo(target);


QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 

[03/14] flink git commit: [FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout

2016-04-13 Thread sewen
[FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() 
will pass before JUnit timeout

This closes #1864


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af799883
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af799883
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af799883

Branch: refs/heads/master
Commit: af799883371b7a78c48f227b25b7a0b099dabf93
Parents: 3c93103
Author: Todd Lisonbee 
Authored: Thu Apr 7 17:47:19 2016 -0700
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../flink/streaming/connectors/kafka/KafkaConsumerTestBase.java | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/af799883/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index aa5344b..dd468a4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -152,6 +152,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
properties.setProperty("zookeeper.connect", 
"localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000"); 
// let the test fail fast
+   properties.setProperty("socket.timeout.ms", "3000");
properties.setProperty("session.timeout.ms", "2000");
properties.setProperty("fetch.max.wait.ms", "2000");
properties.setProperty("heartbeat.interval.ms", "1000");



[10/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition

2016-04-13 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for 
the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka 
the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the 
instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we 
extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties 
Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-   public final static String SERIALIZED_WRAPPER_NAME = 
"flink.kafka.wrapper.serialized";
-
-   private Partitioner wrapped;
-   public PartitionerWrapper(VerifiableProperties properties) {
-   wrapped = (Partitioner) 
properties.props().get(SERIALIZED_WRAPPER_NAME);
-   }
-
-   @Override
-   public int partition(Object value, int numberOfPartitions) {
-   return wrapped.partition(value, numberOfPartitions);
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 000..6aaeca9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static 

[04/14] flink git commit: [FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core dependency.

2016-04-13 Thread sewen
[FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core 
dependency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/272fd12b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/272fd12b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/272fd12b

Branch: refs/heads/master
Commit: 272fd12b41a6e85f0e1825ad4f4593a01f9062aa
Parents: f315c57
Author: Stephan Ewen 
Authored: Tue Apr 5 12:37:33 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 pom.xml | 13 +
 1 file changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/272fd12b/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 82b1567..bdf32d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,12 @@ under the License.
1.1-SNAPSHOT

 
+   
+   
+   com.google.code.findbugs
+   jsr305
+   
+   

org.apache.commons
commons-lang3
@@ -191,6 +197,13 @@ under the License.
-->

 
+   
+   
+   com.google.code.findbugs
+   jsr305
+   1.3.9
+   
+   


org.apache.avro



[01/14] flink git commit: [FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception.

2016-04-13 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master f315c5700 -> db85f3858


[FLINK-3444] [APIs] Add fromElements method with based class type to avoid the 
exception.

This closes #1857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb085ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb085ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb085ec

Branch: refs/heads/master
Commit: 6bb085ec6d70e196b7b61bec5f6dc3f924ca7906
Parents: 693d5ab
Author: gallenvara 
Authored: Wed Apr 6 16:04:32 2016 +0800
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../flink/api/java/ExecutionEnvironment.java| 45 -
 .../flink/api/java/io/FromElementsTest.java | 51 
 .../environment/StreamExecutionEnvironment.java | 33 +
 .../api/StreamExecutionEnvironmentTest.java | 27 +++
 4 files changed, 155 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1363e26..89c817d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -777,7 +777,50 @@ public abstract class ExecutionEnvironment {
throw new IllegalArgumentException("The number of 
elements must not be zero.");
}

-   return fromCollection(Arrays.asList(data), 
TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
+   TypeInformation typeInfo;
+   try {
+   typeInfo = TypeExtractor.getForObject(data[0]);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Could not create 
TypeInformation for type " + data[0].getClass().getName()
+   + "; please specify the TypeInformation 
manually via "
+   + 
"ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+   }
+
+   return fromCollection(Arrays.asList(data), typeInfo, 
Utils.getCallLocationName());
+   }
+   
+   /**
+* Creates a new data set that contains the given elements. The 
framework will determine the type according to the 
+* based type user supplied. The elements should be the same or be the 
subclass to the based type. 
+* The sequence of elements must not be empty.
+* Note that this operation will result in a non-parallel data source, 
i.e. a data source with
+* a parallelism of one.
+*
+* @param type The base class type for every element in the collection.
+* @param data The elements to make up the data set.
+* @return A DataSet representing the given list of elements.
+*/
+   @SafeVarargs
+   public final  DataSource fromElements(Class type, X... data) {
+   if (data == null) {
+   throw new IllegalArgumentException("The data must not 
be null.");
+   }
+   if (data.length == 0) {
+   throw new IllegalArgumentException("The number of 
elements must not be zero.");
+   }
+   
+   TypeInformation typeInfo;
+   try {
+   typeInfo = TypeExtractor.getForClass(type);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Could not create 
TypeInformation for type " + type.getName()
+   + "; please specify the TypeInformation 
manually via "
+   + 
"ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+   }
+
+   return fromCollection(Arrays.asList(data), typeInfo, 
Utils.getCallLocationName());
}



http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
new file mode 100644
index 000..2f403aa
--- /dev/null
+++ 

[02/14] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability by validating written data before consuming

2016-04-13 Thread sewen
[hotfix] [kafka consumer] Increase Kafka test stability by validating written 
data before consuming


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20eda1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20eda1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20eda1b

Branch: refs/heads/master
Commit: d20eda1b5252d888189af29a2b493023b4621a88
Parents: af79988
Author: Stephan Ewen 
Authored: Tue Apr 12 20:14:50 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../connectors/kafka/Kafka08ITCase.java |  72 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 217 +++
 .../connectors/kafka/KafkaTestBase.java |   4 -
 .../testutils/JobManagerCommunicationUtils.java |  24 +-
 .../kafka/testutils/Tuple2Partitioner.java  |   7 +-
 .../org/apache/flink/test/util/TestUtils.java   |   4 +-
 6 files changed, 217 insertions(+), 111 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 0aef3bd..530c032 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -18,9 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -33,7 +31,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -91,16 +88,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
@Test(timeout = 6)
public void testInvalidOffset() throws Exception {
-   final String topic = "invalidOffsetTopic";
+   
final int parallelism = 1;
-
-   // create topic
-   createTestTopic(topic, parallelism, 1);
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
+   
// write 20 messages into topic:
-   writeSequence(env, topic, 20, parallelism);
+   final String topic = writeSequence("invalidOffsetTopic", 20, 
parallelism, 1);
 
// set invalid offset:
CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
@@ -110,6 +102,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
// read from topic
final int valuesCount = 20;
final int startFrom = 0;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   
readSequence(env, standardProps, parallelism, topic, 
valuesCount, startFrom);
 
deleteTestTopic(topic);
@@ -193,10 +189,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 */
@Test(timeout = 6)
public void testOffsetInZookeeper() throws Exception {
-   final String topicName = "testOffsetInZK";
final int parallelism = 3;
 
-   createTestTopic(topicName, parallelism, 1);
+   // write a sequence from 0 to 99 to each of the 3 partitions.
+   final String topicName = writeSequence("testOffsetInZK", 100, 
parallelism, 1);
 
StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env1.getConfig().disableSysoutLogging();
@@ -210,16 +206,7 @@ public class 

[14/14] flink git commit: [FLINK-3737] [tests] Adding comment about SOCKS proxy server for WikipediaEditsSourceTest

2016-04-13 Thread sewen
[FLINK-3737] [tests] Adding comment about SOCKS proxy server for 
WikipediaEditsSourceTest

This closes #1872


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/342db48b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/342db48b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/342db48b

Branch: refs/heads/master
Commit: 342db48b03eceee20d9ec54d97f33054fa0627e8
Parents: b188637
Author: Todd Lisonbee 
Authored: Mon Apr 11 00:41:34 2016 -0700
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:55 2016 +0200

--
 .../connectors/wikiedits/WikipediaEditsSourceTest.java  | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/342db48b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
--
diff --git 
a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
 
b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
index 73ab8bf..0c5d93a 100644
--- 
a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
+++ 
b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
@@ -29,6 +29,11 @@ import static org.junit.Assert.fail;
 
 public class WikipediaEditsSourceTest {
 
+   /**
+* NOTE: if you are behind a firewall you may need to use a SOCKS Proxy 
for this test
+*
+* @see http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html;>Socks
 Proxy
+*/
@Test(timeout = 120 * 1000)
public void testWikipediaEditsSource() throws Exception {
 



[07/14] flink git commit: [FLINK-3700] [core] Add 'Preconditions' utility class.

2016-04-13 Thread sewen
[FLINK-3700] [core] Add 'Preconditions' utility class.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/885d543b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/885d543b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/885d543b

Branch: refs/heads/master
Commit: 885d543be8a8c0d1acdffa0003e394347d5376ef
Parents: 272fd12
Author: Stephan Ewen 
Authored: Tue Apr 5 13:23:14 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../org/apache/flink/util/Preconditions.java| 213 +++
 1 file changed, 213 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/885d543b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java 
b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
new file mode 100644
index 000..135038b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// 
+//  This class is largely adapted from "com.google.common.base.Preconditions",
+//  which is part of the "Guava" library.
+//
+//  Because of frequent issues with dependency conflicts, this class was
+//  added to the Flink code base to reduce dependency on Guava.
+// 
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+/**
+ * A collection of static utility methods to validate input.
+ * 
+ * This class is modelled after Google Guava's Preconditions class, and 
partly takes code
+ * from that class. We add this code to the Flink code base in order to reduce 
external
+ * dependencies.
+ */
+@Internal
+public final class Preconditions {
+
+   // 

+   //  Null checks
+   // 

+   
+   /**
+* Ensures that the given object reference is not null.
+* Upon violation, a {@code NullPointerException} with no message is 
thrown.
+* 
+* @param reference The object reference
+* @return The object reference itself (generically typed).
+* 
+* @throws NullPointerException Thrown, if the passed reference was 
null.
+*/
+   public static  T checkNotNull(T reference) {
+   if (reference == null) {
+   throw new NullPointerException();
+   }
+   return reference;
+   }
+   
+   /**
+* Ensures that the given object reference is not null.
+* Upon violation, a {@code NullPointerException} with the given 
message is thrown.
+* 
+* @param reference The object reference
+* @param errorMessage The message for the {@code NullPointerException} 
that is thrown if the check fails.
+* @return The object reference itself (generically typed).
+*
+* @throws NullPointerException Thrown, if the passed reference was 
null.
+*/
+   public static  T checkNotNull(T reference, @Nullable String 
errorMessage) {
+   if (reference == null) {
+   throw new 
NullPointerException(String.valueOf(errorMessage));
+   }
+   return reference;
+   }
+
+   /**
+* Ensures that the given object reference is not null.
+* Upon violation, a {@code NullPointerException} with the given 
message is thrown.
+* 
+* The error message is constructed from a template and an arguments 
array, after
+* a similar fashion as {@link String#format(String, Object...)}, but 
supporting 

[11/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition

2016-04-13 Thread sewen
[FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a 
WatermarkExtractor object per partition


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c93103d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c93103d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c93103d

Branch: refs/heads/master
Commit: 3c93103d1476cb05ec0c018bfa6876e4ecad38e8
Parents: 0ac1549
Author: Stephan Ewen 
Authored: Wed Apr 6 23:21:17 2016 +0200
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:54 2016 +0200

--
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 358 ++--
 .../connectors/kafka/internals/Fetcher.java |  72 --
 .../kafka/internals/Kafka08Fetcher.java | 446 +
 .../kafka/internals/KillerWatchDog.java |  62 ++
 .../kafka/internals/LegacyFetcher.java  | 896 ---
 .../kafka/internals/OffsetHandler.java  |  55 --
 .../kafka/internals/PartitionInfoFetcher.java   |  66 ++
 .../kafka/internals/PartitionerWrapper.java |  49 -
 .../internals/PeriodicOffsetCommitter.java  |  85 ++
 .../kafka/internals/SimpleConsumerThread.java   | 504 +++
 .../kafka/internals/ZookeeperOffsetHandler.java |  58 +-
 .../connectors/kafka/Kafka08ITCase.java | 176 ++--
 .../connectors/kafka/KafkaConsumer08Test.java   |  90 ++
 .../connectors/kafka/KafkaConsumerTest.java | 156 
 .../kafka/KafkaShortRetention08ITCase.java  |   3 +-
 .../internals/ZookeeperOffsetHandlerTest.java   |  56 --
 .../src/test/resources/log4j-test.properties|   5 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 398 ++--
 .../kafka/internal/Kafka09Fetcher.java  | 311 +++
 .../connectors/kafka/Kafka09ITCase.java |  21 +-
 .../connectors/kafka/KafkaProducerTest.java |   8 +-
 .../kafka/KafkaShortRetention09ITCase.java  |   1 +
 .../src/test/resources/log4j-test.properties|   5 +-
 .../kafka/FlinkKafkaConsumerBase.java   | 668 ++
 .../kafka/internals/AbstractFetcher.java| 439 +
 .../kafka/internals/ExceptionProxy.java |  73 ++
 .../kafka/internals/KafkaPartitionState.java|  65 --
 .../kafka/internals/KafkaTopicPartition.java|  36 +-
 .../internals/KafkaTopicPartitionState.java | 105 +++
 ...picPartitionStateWithPeriodicWatermarks.java |  71 ++
 ...cPartitionStateWithPunctuatedWatermarks.java |  84 ++
 .../kafka/partitioner/KafkaPartitioner.java |   2 +-
 .../connectors/kafka/util/KafkaUtils.java   |  13 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java   | 222 +
 .../KafkaConsumerPartitionAssignmentTest.java   |  96 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 175 ++--
 .../connectors/kafka/KafkaProducerTestBase.java |   8 +-
 .../kafka/KafkaShortRetentionTestBase.java  |  45 +-
 .../internals/KafkaTopicPartitionTest.java  |  57 ++
 .../testutils/JobManagerCommunicationUtils.java |  49 +-
 .../kafka/testutils/MockRuntimeContext.java |  26 +-
 .../AssignerWithPeriodicWatermarks.java |   3 +
 .../AssignerWithPunctuatedWatermarks.java   |   3 +
 43 files changed, 3407 insertions(+), 2714 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 4748781..48cc461 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.common.ErrorMapping;
 import kafka.javaapi.PartitionMetadata;
@@ -24,40 +25,32 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;

[12/14] flink git commit: [FLINK-3126] [core] Remove accumulator type from "value" in web frontend

2016-04-13 Thread sewen
[FLINK-3126] [core] Remove accumulator type from "value" in web frontend

This closes #1868


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db85f385
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db85f385
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db85f385

Branch: refs/heads/master
Commit: db85f385846a6541c0707ed1ba6fed78446423b5
Parents: 342db48
Author: Zack Pierce 
Authored: Mon Apr 11 10:17:34 2016 -0700
Committer: Stephan Ewen 
Committed: Wed Apr 13 01:10:55 2016 +0200

--
 .../StringifiedAccumulatorResult.java   |  14 +-
 .../StringifiedAccumulatorResultTest.java   | 138 +++
 2 files changed, 149 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
index a0d1eda..c4faad1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java
@@ -55,6 +55,9 @@ public class StringifiedAccumulatorResult implements 
java.io.Serializable{
//  Utilities
// 

 
+   /**
+* Flatten a map of accumulator names to Accumulator instances into an 
array of StringifiedAccumulatorResult values
+ */
public static StringifiedAccumulatorResult[] 
stringifyAccumulatorResults(Map accs) {
if (accs == null || accs.isEmpty()) {
return new StringifiedAccumulatorResult[0];
@@ -65,9 +68,14 @@ public class StringifiedAccumulatorResult implements 
java.io.Serializable{
int i = 0;
for (Map.Entry entry : 
accs.entrySet()) {
StringifiedAccumulatorResult result;
-   Accumulator value = entry.getValue();
-   if (value != null) {
-   result = new 
StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), 
value.toString());
+   Accumulator accumulator = 
entry.getValue();
+   if (accumulator != null) {
+   Object localValue = 
accumulator.getLocalValue();
+   if (localValue != null) {
+   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), localValue.toString());
+   } else {
+   result = new 
StringifiedAccumulatorResult(entry.getKey(), 
accumulator.getClass().getSimpleName(), "null");
+   }
} else {
result = new 
StringifiedAccumulatorResult(entry.getKey(), "null", "null");
}

http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
new file mode 100644
index 000..e6d637b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR