[1/2] flink git commit: [FLINK-3247] Remove * exclude from quickstarts

2016-02-02 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 79058edb6 -> 360f02b1f


[FLINK-3247] Remove * exclude from quickstarts

This closes #1573


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/360f02b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/360f02b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/360f02b1

Branch: refs/heads/master
Commit: 360f02b1f1f5ae4e9c3b3c43a9af033b66ecdcef
Parents: cb1a5ec
Author: Robert Metzger 
Authored: Tue Feb 2 12:04:26 2016 +0100
Committer: Robert Metzger 
Committed: Tue Feb 2 15:38:46 2016 +0100

--
 .../src/main/resources/archetype-resources/pom.xml| 7 +--
 .../src/main/resources/archetype-resources/pom.xml| 7 +--
 2 files changed, 10 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/360f02b1/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index eb8bee1..bd1e723 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -112,7 +112,9 @@ under the License.

Everything else will be packaged into the fat-jar
-->

org.apache.flink:flink-annotations
-   
org.apache.flink:flink-shaded-*
+   
org.apache.flink:flink-shaded-hadoop1
+   
org.apache.flink:flink-shaded-hadoop2
+   
org.apache.flink:flink-shaded-curator-recipes

org.apache.flink:flink-core

org.apache.flink:flink-java

org.apache.flink:flink-scala_2.10
@@ -184,7 +186,8 @@ under the License.


org.apache.flink:*


-   
org/apache/flink/shaded/**
+   

+   
org/apache/flink/shaded/com/**

web-docs/**




http://git-wip-us.apache.org/repos/asf/flink/blob/360f02b1/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index e575181..df98b95 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -116,7 +116,9 @@ under the License.

Everything else will be packaged into the fat-jar
-->

org.apache.flink:flink-annotations
-   
org.apache.flink:flink-shaded-*
+   
org.apache.flink:flink-shaded-hadoop1
+   
org.apache.flink:flink-shaded-hadoop2
+   
org.apache.flink:flink-shaded-curator-recipes

[2/2] flink git commit: [FLINK-3287] Shade Curator dependency into flink-connector-kafka-0.8

2016-02-02 Thread rmetzger
[FLINK-3287] Shade Curator dependency into flink-connector-kafka-0.8


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb1a5ecb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb1a5ecb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb1a5ecb

Branch: refs/heads/master
Commit: cb1a5ecbca9b8df924968904d87b6ff4ac4d7be7
Parents: 79058ed
Author: Robert Metzger 
Authored: Thu Jan 28 15:16:16 2016 +0100
Committer: Robert Metzger 
Committed: Tue Feb 2 15:38:46 2016 +0100

--
 .../flink-connector-kafka-0.8/pom.xml   | 27 
 1 file changed, 27 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cb1a5ecb/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
--
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index 0b23994..3672c81 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -154,6 +154,33 @@ under the License.
1


+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
org.apache.flink:flink-shaded-curator-recipes
+   
+   
+   
+   
+   
org.apache.curator
+   
org.apache.flink.shaded.org.apache.curator
+   
+   
+   
+   
+   
+   






flink-web git commit: [FLINK-3316] Fix broken links in first paragraph of landing page

2016-02-02 Thread trohrmann
Repository: flink-web
Updated Branches:
  refs/heads/asf-site b68e1f48b -> ffe31bf53


[FLINK-3316] Fix broken links in first paragraph of landing page


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/ffe31bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/ffe31bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/ffe31bf5

Branch: refs/heads/asf-site
Commit: ffe31bf53ca28a842d4a352a568f288251d56490
Parents: b68e1f4
Author: Till Rohrmann 
Authored: Tue Feb 2 16:19:53 2016 +0100
Committer: Till Rohrmann 
Committed: Tue Feb 2 16:19:53 2016 +0100

--
 index.md | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink-web/blob/ffe31bf5/index.md
--
diff --git a/index.md b/index.md
index 5ff5583..c675b4c 100644
--- a/index.md
+++ b/index.md
@@ -10,18 +10,18 @@ layout: base
 
   
 
-**Flink’s core** is a [streaming dataflow 
engine](features.html#unified-stream-amp-batch-processing) that provides data 
distribution, communication, and fault tolerance for distributed computations 
over data streams.
+**Flink’s core** is a [streaming dataflow engine](features.html) that 
provides data distribution, communication, and fault tolerance for distributed 
computations over data streams.
 
 Flink includes **several APIs** for creating applications that use the Flink 
engine:
 
-1. [DataSet API](features.html#dataset-api) for static data embedded in Java, 
Scala, and Python,
-2. [DataStream API](features.html#datastream-api) for unbounded streams 
embedded in Java and Scala, and
-3. [Table API](features.html#table-api) with a SQL-like expression language 
embedded in Java and Scala.
+1. [DataStream API]({{ site.docs-snapshot }}/apis/streaming/index.html) for 
unbounded streams embedded in Java and Scala, and
+2. [DataSet API]({{ site.docs-snapshot }}/apis/batch/index.html) for static 
data embedded in Java, Scala, and Python,
+3. [Table API]({{ site.docs-snapshot }}/libs/table.html) with a SQL-like 
expression language embedded in Java and Scala.
 
 Flink also bundles **libraries for domain-specific use cases**:
 
-1. [Machine Learning library](features.html#machine-learning-library), and
-2. [Gelly](features.html#graph-api-amp-library-gelly), a graph processing API 
and library.
+1. [Machine Learning library]({{ site.docs-snapshot }}/libs/ml/index.html), and
+2. [Gelly]({{ site.docs-snapshot }}/libs/gelly_guide.html), a graph processing 
API and library.
 
 You can **integrate** Flink easily with other well-known open source systems 
both for [data input and output](features.html#deployment-and-integration) as 
well as [deployment](features.html#deployment-and-integration).
   



[13/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
new file mode 100644
index 000..a196984
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.junit.Assert;
+
+public class TupleSerializerTestInstance extends 
SerializerTestInstance {
+
+   public TupleSerializerTestInstance(TypeSerializer serializer, 
Class typeClass, int length, T[] testData) {
+   super(serializer, typeClass, length, testData);
+   }
+   
+   protected void deepEquals(String message, T shouldTuple, T isTuple) {
+   Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity());
+   
+   for (int i = 0; i < shouldTuple.getArity(); i++) {
+   Object should = shouldTuple.getField(i);
+   Object is = isTuple.getField(i);
+   
+   if (should.getClass().isArray()) {
+   if (should instanceof boolean[]) {
+   Assert.assertTrue(message, 
Arrays.equals((boolean[]) should, (boolean[]) is));
+   }
+   else if (should instanceof byte[]) {
+   assertArrayEquals(message, (byte[]) 
should, (byte[]) is);
+   }
+   else if (should instanceof short[]) {
+   assertArrayEquals(message, (short[]) 
should, (short[]) is);
+   }
+   else if (should instanceof int[]) {
+   assertArrayEquals(message, (int[]) 
should, (int[]) is);
+   }
+   else if (should instanceof long[]) {
+   assertArrayEquals(message, (long[]) 
should, (long[]) is);
+   }
+   else if (should instanceof float[]) {
+   assertArrayEquals(message, (float[]) 
should, (float[]) is, 0.0f);
+   }
+   else if (should instanceof double[]) {
+   assertArrayEquals(message, (double[]) 
should, (double[]) is, 0.0);
+   }
+   else if (should instanceof char[]) {
+   assertArrayEquals(message, (char[]) 
should, (char[]) is);
+   }
+   else {
+   assertArrayEquals(message, (Object[]) 
should, (Object[]) is);
+   }
+   }
+   else {
+   assertEquals(message,  should, is);
+   }
+   }
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
 

[01/38] flink git commit: [hotfix] Reduce the heavy sysout verbosity for certain tests

2016-02-02 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 360f02b1f -> 8fc7e7af2


[hotfix] Reduce the heavy sysout verbosity for certain tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f042e78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f042e78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f042e78

Branch: refs/heads/master
Commit: 6f042e7894be388fa8e400a08002584c10781e60
Parents: 21a7158
Author: Stephan Ewen 
Authored: Mon Feb 1 16:46:03 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 16:55:44 2016 +0100

--
 .../jar/CheckpointedStreamingProgram.java | 10 --
 .../flink/test/recovery/FastFailuresITCase.java   | 18 +++---
 2 files changed, 15 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 47253da..cda5a7b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.lang.RuntimeException;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 /**
  * A simple streaming program, which is using the state checkpointing of Flink.
@@ -40,14 +38,6 @@ public class CheckpointedStreamingProgram {
private static final int CHECKPOINT_INTERVALL = 100;

public static void main(String[] args) throws Exception {
-   ClassLoader cl = ClassLoader.getSystemClassLoader();
-   URL[] urls = ((URLClassLoader)cl).getURLs();
-
-   for(URL url: urls){
-   System.out.println(url.getFile());
-   }
-   System.out.println("CheckpointedStreamingProgram classpath: ");
-
final String jarFile = args[0];
final String host = args[1];
final int port = Integer.parseInt(args[2]);

http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 0684fde..2a139c7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -21,11 +21,13 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Test;
 
@@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
 public class FastFailuresITCase {
 
static final AtomicInteger FAILURES_SO_FAR = new AtomicInteger();
@@ -40,12 +43,21 @@ public class FastFailuresITCase {

@Test
public void testThis() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
+   ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(config, false);
+   cluster.start();
+   
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+   "localhost", cluster.getLeaderRPCPort());
+
+   

[05/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
deleted file mode 100644
index cd405bc..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.junit.Test;
-
-public class CompositeTypeTest {
-
-   private final TupleTypeInfo tupleTypeInfo = new 
TupleTypeInfo>(
-   BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
-
-
-   private final TupleTypeInfo> 
inNestedTuple1 = new TupleTypeInfo>(
-   BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-
-   private final TupleTypeInfo> inNestedTuple2 = 
new TupleTypeInfo>(
-   BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.DOUBLE_TYPE_INFO);
-
-   private final TupleTypeInfo nestedTypeInfo = new 
TupleTypeInfo, Integer, 
Tuple2>>(
-   BasicTypeInfo.INT_TYPE_INFO,
-   inNestedTuple1,
-   BasicTypeInfo.INT_TYPE_INFO,
-   inNestedTuple2);
-
-   private final TupleTypeInfo>> 
inNestedTuple3 = new TupleTypeInfo>>(
-   BasicTypeInfo.INT_TYPE_INFO,
-   new TupleTypeInfo>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
-
-   private final TupleTypeInfo deepNestedTupleTypeInfo = new 
TupleTypeInfo>, 
Integer>>(
-   BasicTypeInfo.INT_TYPE_INFO,
-   inNestedTuple3,
-   BasicTypeInfo.INT_TYPE_INFO );
-
-   private final PojoTypeInfo pojoTypeInfo = ((PojoTypeInfo) 
TypeExtractor.getForClass
-   (MyPojo.class));
-
-   private final TupleTypeInfo pojoInTupleTypeInfo = new 
TupleTypeInfo>(BasicTypeInfo.INT_TYPE_INFO, 
pojoTypeInfo);
-
-   @Test
-   public void testGetFlatFields() {
-   assertEquals(0, 
tupleTypeInfo.getFlatFields("0").get(0).getPosition());
-   assertEquals(1, 
tupleTypeInfo.getFlatFields("1").get(0).getPosition());
-   assertEquals(2, 
tupleTypeInfo.getFlatFields("2").get(0).getPosition());
-   assertEquals(3, 
tupleTypeInfo.getFlatFields("3").get(0).getPosition());
-   assertEquals(0, 
tupleTypeInfo.getFlatFields("f0").get(0).getPosition());
-   assertEquals(1, 
tupleTypeInfo.getFlatFields("f1").get(0).getPosition());
-   assertEquals(2, 
tupleTypeInfo.getFlatFields("f2").get(0).getPosition());
-   assertEquals(3, 
tupleTypeInfo.getFlatFields("f3").get(0).getPosition());
-
-   assertEquals(0, 
nestedTypeInfo.getFlatFields("0").get(0).getPosition());
-   assertEquals(1, 
nestedTypeInfo.getFlatFields("1.0").get(0).getPosition());
-   assertEquals(2, 
nestedTypeInfo.getFlatFields("1.1").get(0).getPosition());
-   assertEquals(3, 
nestedTypeInfo.getFlatFields("1.2").get(0).getPosition());
-   assertEquals(4, 
nestedTypeInfo.getFlatFields("2").get(0).getPosition());
- 

[30/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
new file mode 100644
index 000..8116121
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// --
+//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
+//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+// --
+
+
+package org.apache.flink.api.java.tuple;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A tuple with 20 fields. Tuples are strongly typed; each field may be of a 
separate type.
+ * The fields of the tuple can be accessed directly as public fields (f0, f1, 
...) or via their position
+ * through the {@link #getField(int)} method. The tuple field positions start 
at zero.
+ * 
+ * Tuples are mutable types, meaning that their fields can be re-assigned. 
This allows functions that work
+ * with Tuples to reuse objects in order to reduce pressure on the garbage 
collector.
+ *
+ * @see Tuple
+ *
+ * @param  The type of field 0
+ * @param  The type of field 1
+ * @param  The type of field 2
+ * @param  The type of field 3
+ * @param  The type of field 4
+ * @param  The type of field 5
+ * @param  The type of field 6
+ * @param  The type of field 7
+ * @param  The type of field 8
+ * @param  The type of field 9
+ * @param  The type of field 10
+ * @param  The type of field 11
+ * @param  The type of field 12
+ * @param  The type of field 13
+ * @param  The type of field 14
+ * @param  The type of field 15
+ * @param  The type of field 16
+ * @param  The type of field 17
+ * @param  The type of field 18
+ * @param  The type of field 19
+ */
+public class Tuple20 extends Tuple {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Field 0 of the tuple. */
+   public T0 f0;
+   /** Field 1 of the tuple. */
+   public T1 f1;
+   /** Field 2 of the tuple. */
+   public T2 f2;
+   /** Field 3 of the tuple. */
+   public T3 f3;
+   /** Field 4 of the tuple. */
+   public T4 f4;
+   /** Field 5 of the tuple. */
+   public T5 f5;
+   /** Field 6 of the tuple. */
+   public T6 f6;
+   /** Field 7 of the tuple. */
+   public T7 f7;
+   /** Field 8 of the tuple. */
+   public T8 f8;
+   /** Field 9 of the tuple. */
+   public T9 f9;
+   /** Field 10 of the tuple. */
+   public T10 f10;
+   /** Field 11 of the tuple. */
+   public T11 f11;
+   /** Field 12 of the tuple. */
+   public T12 f12;
+   /** Field 13 of the tuple. */
+   public T13 f13;
+   /** Field 14 of the tuple. */
+   public T14 f14;
+   /** Field 15 of the tuple. */
+   public T15 f15;
+   /** Field 16 of the tuple. */
+   public T16 f16;
+   /** Field 17 of the tuple. */
+   public T17 f17;
+   /** Field 18 of the tuple. */
+   public T18 f18;
+   /** Field 19 of the tuple. */
+   public T19 f19;
+
+   /**
+* Creates a new tuple where all fields are null.
+*/
+   public Tuple20() {}
+
+   /**
+* Creates a new tuple and assigns the given values to the tuple's 
fields.
+*
+* @param value0 The value for field 0
+* @param value1 The value for field 1
+* @param value2 The value for field 2
+* @param value3 The value for field 3
+* @param value4 The value for field 4
+* @param value5 The value for field 5
+* @param value6 The value for field 6
+* @param value7 The value for field 7
+* @param value8 The value for field 8
+* @param value9 The value for field 9
+* @param value10 The value for field 10
+ 

[08/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
deleted file mode 100644
index de24956..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-public final class PojoSerializer extends TypeSerializer {
-
-   // Flags for the header
-   private static byte IS_NULL = 1;
-   private static byte NO_SUBCLASS = 2;
-   private static byte IS_SUBCLASS = 4;
-   private static byte IS_TAGGED_SUBCLASS = 8;
-
-   private static final long serialVersionUID = 1L;
-
-   private final Class clazz;
-
-   private final TypeSerializer[] fieldSerializers;
-
-   private final int numFields;
-
-   private final Map registeredClasses;
-
-   private final TypeSerializer[] registeredSerializers;
-
-   private final ExecutionConfig executionConfig;
-
-   private transient Map 
subclassSerializerCache;
-   private transient ClassLoader cl;
-   // We need to handle these ourselves in writeObject()/readObject()
-   private transient Field[] fields;
-
-   @SuppressWarnings("unchecked")
-   public PojoSerializer(
-   Class clazz,
-   TypeSerializer[] fieldSerializers,
-   Field[] fields,
-   ExecutionConfig executionConfig) {
-
-   this.clazz = Preconditions.checkNotNull(clazz);
-   this.fieldSerializers = (TypeSerializer[]) 
Preconditions.checkNotNull(fieldSerializers);
-   this.fields = Preconditions.checkNotNull(fields);
-   this.numFields = fieldSerializers.length;
-   this.executionConfig = 
Preconditions.checkNotNull(executionConfig);
-
-   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
-
-   for (int i = 0; i < numFields; i++) {
-   this.fields[i].setAccessible(true);
-   }
-
-   cl = Thread.currentThread().getContextClassLoader();
-
-   subclassSerializerCache = new HashMap();
-
-   // We only want those classes that are not our own class and 
are actually sub-classes.
-   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
-   for (Class registeredClass: registeredPojoTypes) {
-   if (registeredClass.equals(clazz)) {
-   continue;
-   }
-   if (!clazz.isAssignableFrom(registeredClass)) {
-   continue;
-   }
-   cleanedTaggedClasses.add(registeredClass);
-
-   }
-   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
-   registeredSerializers = new 

[23/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
deleted file mode 100644
index b423a3b..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-// --
-//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
-//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
-// --
-
-
-package org.apache.flink.api.java.tuple.builder;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple15;
-
-public class Tuple15Builder {
-
-   private List> tuples = new ArrayList>();
-
-   public Tuple15Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 
value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, 
T12 value12, T13 value13, T14 value14){
-   tuples.add(new Tuple15(value0, value1, value2, value3, value4, value5, 
value6, value7, value8, value9, value10, value11, value12, value13, value14));
-   return this;
-   }
-
-   @SuppressWarnings("unchecked")
-   public Tuple15[] build(){
-   return tuples.toArray(new Tuple15[tuples.size()]);
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
deleted file mode 100644
index c698730..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-// --
-//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
-//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
-// --
-
-
-package org.apache.flink.api.java.tuple.builder;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple16;
-
-public class Tuple16Builder {
-
-   private List> tuples = new ArrayList

[04/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
deleted file mode 100644
index 19fac43..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
-   @Override
-   protected  TypeSerializer createSerializer(Class type) {
-   return new AvroSerializer(type);
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
deleted file mode 100644
index df1ff60..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-
-   @Override
-   protected  TypeSerializer createSerializer(Class type) {
-   return new AvroSerializer(type);
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
deleted file mode 100644
index 8a89410..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 

[09/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
deleted file mode 100644
index 5187de7..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Type information for data types that extend the {@link Value} interface. 
The value
- * interface allows types to define their custom serialization and 
deserialization routines.
- *
- * @param  The type of the class represented by this type information.
- */
-public class ValueTypeInfo extends TypeInformation 
implements AtomicType {
-
-   private static final long serialVersionUID = 1L;
-
-   public static final ValueTypeInfo BOOLEAN_VALUE_TYPE_INFO 
= new ValueTypeInfo<>(BooleanValue.class);
-   public static final ValueTypeInfo BYTE_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(ByteValue.class);
-   public static final ValueTypeInfo CHAR_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(CharValue.class);
-   public static final ValueTypeInfo DOUBLE_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(DoubleValue.class);
-   public static final ValueTypeInfo FLOAT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(FloatValue.class);
-   public static final ValueTypeInfo INT_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(IntValue.class);
-   public static final ValueTypeInfo LONG_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(LongValue.class);
-   public static final ValueTypeInfo NULL_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(NullValue.class);
-   public static final ValueTypeInfo SHORT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(ShortValue.class);
-   public static final ValueTypeInfo STRING_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(StringValue.class);
-
-   private final Class type;
-   
-   public ValueTypeInfo(Class type) {
-   this.type = Preconditions.checkNotNull(type);
-
-   Preconditions.checkArgument(
-   Value.class.isAssignableFrom(type) || 
type.equals(Value.class),
-   "ValueTypeInfo can only be used for subclasses of " + 
Value.class.getName());
-   }
-   
-   @Override
-   public int getArity() {
-   return 1;
-   }
-
-   @Override
-   public int getTotalFields() {
-   return 1;
-   }
-   
-   @Override
-   public Class getTypeClass() {
-   return this.type;
-   }
-
-   @Override
-   public boolean isBasicType() {
-   return false;
-   }
-
-   public boolean isBasicValueType() {
-   return 

[38/38] flink git commit: [FLINK-3314] [streaming] Fix case where early cancel messages do not properly cancel a stream operator.

2016-02-02 Thread sewen
[FLINK-3314] [streaming] Fix case where early cancel messages do not properly 
cancel a stream operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fc7e7af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fc7e7af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fc7e7af

Branch: refs/heads/master
Commit: 8fc7e7af2c7decb8e531b76e3edcc2601f73fe9d
Parents: c356ef3
Author: Stephan Ewen 
Authored: Tue Feb 2 12:44:42 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 17:11:50 2016 +0100

--
 .../streaming/runtime/tasks/StreamTask.java |  20 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 254 +++
 2 files changed, 273 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8fc7e7af/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ab6c10..c9624fc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -146,6 +146,9 @@ public abstract class StreamTask
/** Flag to mark the task "in operation", in which case check
 * needs to be initialized to true, so that early cancel() before 
invoke() behaves correctly */
private volatile boolean isRunning;
+   
+   /** Flag to mark this task as canceled */
+   private volatile boolean canceled;
 
private long recoveryTimestamp;
 
@@ -191,6 +194,11 @@ public abstract class StreamTask
// task specific initialization
init();

+   // save the work of reloadig state, etc, if the task is 
already canceled
+   if (canceled) {
+   throw new CancelTaskException();
+   }
+   
//  Invoke 
LOG.debug("Invoking {}", getName());

@@ -205,7 +213,12 @@ public abstract class StreamTask
openAllOperators();
}
 
-   // let the task do its work
+   // final check to exit early before starting to run
+   if (canceled) {
+   throw new CancelTaskException();
+   }
+
+   // let the task do its work
isRunning = true;
run();
isRunning = false;
@@ -290,6 +303,7 @@ public abstract class StreamTask
@Override
public final void cancel() throws Exception {
isRunning = false;
+   canceled = true;
cancelTask();
}
 
@@ -297,6 +311,10 @@ public abstract class StreamTask
return isRunning;
}

+   public final boolean isCanceled() {
+   return canceled;
+   }
+   
private void openAllOperators() throws Exception {
for (StreamOperator operator : 
operatorChain.getAllOperators()) {
if (operator != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8fc7e7af/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
new file mode 100644
index 000..c18d150
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain 

[03/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
deleted file mode 100644
index 8cdee9b..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD1Test extends 
TupleComparatorTestBase> {
-
-   @SuppressWarnings("unchecked")
-   Tuple3[] dataISD = new Tuple3[]{
-   new Tuple3(4, "hello", 20.0),
-   new Tuple3(5, "hello", 23.2),
-   new Tuple3(6, "world", 20.0),
-   new Tuple3(7, "hello", 20.0),
-   new Tuple3(8, "hello", 23.2),
-   new Tuple3(9, "world", 20.0),
-   new Tuple3(10, "hello", 20.0),
-   new Tuple3(11, "hello", 23.2)
-   };
-
-   @Override
-   protected TupleComparator> 
createComparator(boolean ascending) {
-   return new TupleComparator>(
-   new int[]{0},
-   new TypeComparator[]{ new 
IntComparator(ascending) },
-   new TypeSerializer[]{ IntSerializer.INSTANCE });
-   }
-
-   @SuppressWarnings("unchecked")
-   @Override
-   protected TupleSerializer> 
createSerializer() {
-   return new TupleSerializer>(
-   (Class>) 
(Class) Tuple3.class,
-   new TypeSerializer[]{
-   new IntSerializer(),
-   new StringSerializer(),
-   new DoubleSerializer()});
-   }
-
-   @Override
-   protected Tuple3[] getSortedTestData() {
-   return dataISD;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
deleted file mode 100644
index 06c292f..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  

[22/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
[FLINK-3303] [core] Move all type utilities to flink-core


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21a71586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21a71586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21a71586

Branch: refs/heads/master
Commit: 21a715867d655bb61df9a9f7eef37e42b99e206a
Parents: 7081836
Author: Stephan Ewen 
Authored: Sun Jan 31 23:28:32 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 16:55:44 2016 +0100

--
 flink-core/pom.xml  |   47 +-
 .../apache/flink/api/common/operators/Keys.java |  459 +
 .../flink/api/java/functions/KeySelector.java   |   63 +
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   78 +
 .../api/java/typeutils/EitherTypeInfo.java  |  122 ++
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  122 ++
 .../api/java/typeutils/GenericTypeInfo.java |  116 ++
 .../java/typeutils/InputTypeConfigurable.java   |   42 +
 .../api/java/typeutils/MissingTypeInfo.java |  121 ++
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  141 ++
 .../flink/api/java/typeutils/PojoField.java |  108 +
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  406 
 .../api/java/typeutils/ResultTypeQueryable.java |   37 +
 .../flink/api/java/typeutils/TupleTypeInfo.java |  248 +++
 .../api/java/typeutils/TupleTypeInfoBase.java   |  252 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 1692 +++
 .../api/java/typeutils/TypeInfoParser.java  |  383 
 .../flink/api/java/typeutils/ValueTypeInfo.java |  183 ++
 .../api/java/typeutils/WritableTypeInfo.java|  139 ++
 .../java/typeutils/runtime/AvroSerializer.java  |  201 ++
 .../runtime/CopyableValueComparator.java|  167 ++
 .../runtime/CopyableValueSerializer.java|  129 ++
 .../typeutils/runtime/DataInputDecoder.java |  229 +++
 .../typeutils/runtime/DataInputViewStream.java  |   71 +
 .../typeutils/runtime/DataOutputEncoder.java|  190 ++
 .../typeutils/runtime/DataOutputViewStream.java |   41 +
 .../typeutils/runtime/EitherSerializer.java |  193 ++
 .../runtime/GenericTypeComparator.java  |  177 ++
 .../api/java/typeutils/runtime/KryoUtils.java   |   87 +
 .../java/typeutils/runtime/NoFetchingInput.java |  141 ++
 .../java/typeutils/runtime/PojoComparator.java  |  354 
 .../java/typeutils/runtime/PojoSerializer.java  |  592 ++
 .../runtime/RuntimeComparatorFactory.java   |   75 +
 .../runtime/RuntimePairComparatorFactory.java   |   44 +
 .../runtime/RuntimeSerializerFactory.java   |  124 ++
 .../typeutils/runtime/Tuple0Serializer.java |  121 ++
 .../java/typeutils/runtime/TupleComparator.java |  157 ++
 .../typeutils/runtime/TupleComparatorBase.java  |  279 +++
 .../java/typeutils/runtime/TupleSerializer.java |  158 ++
 .../typeutils/runtime/TupleSerializerBase.java  |  102 +
 .../java/typeutils/runtime/ValueComparator.java |  183 ++
 .../java/typeutils/runtime/ValueSerializer.java |  152 ++
 .../typeutils/runtime/WritableComparator.java   |  189 ++
 .../typeutils/runtime/WritableSerializer.java   |  153 ++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  382 
 .../typeutils/runtime/kryo/Serializers.java |  227 ++
 .../common/operators/ExpressionKeysTest.java|  481 +
 .../operators/SelectorFunctionKeysTest.java |  154 ++
 .../apache/flink/api/java/tuple/Tuple2Test.java |   44 +
 .../api/java/typeutils/CompositeTypeTest.java   |  179 ++
 .../api/java/typeutils/EitherTypeInfoTest.java  |   61 +
 .../api/java/typeutils/EnumTypeInfoTest.java|   51 +
 .../api/java/typeutils/GenericTypeInfoTest.java |   47 +
 .../api/java/typeutils/MissingTypeInfoTest.java |   47 +
 .../java/typeutils/ObjectArrayTypeInfoTest.java |   58 +
 .../java/typeutils/PojoTypeExtractionTest.java  |  812 
 .../api/java/typeutils/PojoTypeInfoTest.java|  153 ++
 .../java/typeutils/PojoTypeInformationTest.java |   98 +
 .../api/java/typeutils/TupleTypeInfoTest.java   |   96 +
 .../TypeExtractorInputFormatsTest.java  |  231 +++
 .../api/java/typeutils/TypeExtractorTest.java   | 1907 +
 .../api/java/typeutils/TypeInfoParserTest.java  |  338 +++
 .../api/java/typeutils/ValueTypeInfoTest.java   |   87 +
 .../java/typeutils/WritableTypeInfoTest.java|   74 +
 .../AbstractGenericArraySerializerTest.java |  187 ++
 .../AbstractGenericTypeComparatorTest.java  |  376 
 .../AbstractGenericTypeSerializerTest.java  |  364 
 .../runtime/AvroGenericArraySerializerTest.java |   28 +
 .../runtime/AvroGenericTypeComparatorTest.java  |   28 +
 .../runtime/AvroGenericTypeSerializerTest.java  |   29 +
 .../runtime/AvroSerializerEmptyArrayTest.java   |  189 ++
 .../runtime/CopyableValueComparatorTest.java|   53 +
 .../typeutils/runtime/EitherSerializerTest.java |  113 +
 

[29/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
new file mode 100644
index 000..901b838
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// --
+//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
+//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+// --
+
+
+package org.apache.flink.api.java.tuple;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A tuple with 25 fields. Tuples are strongly typed; each field may be of a 
separate type.
+ * The fields of the tuple can be accessed directly as public fields (f0, f1, 
...) or via their position
+ * through the {@link #getField(int)} method. The tuple field positions start 
at zero.
+ * 
+ * Tuples are mutable types, meaning that their fields can be re-assigned. 
This allows functions that work
+ * with Tuples to reuse objects in order to reduce pressure on the garbage 
collector.
+ *
+ * @see Tuple
+ *
+ * @param  The type of field 0
+ * @param  The type of field 1
+ * @param  The type of field 2
+ * @param  The type of field 3
+ * @param  The type of field 4
+ * @param  The type of field 5
+ * @param  The type of field 6
+ * @param  The type of field 7
+ * @param  The type of field 8
+ * @param  The type of field 9
+ * @param  The type of field 10
+ * @param  The type of field 11
+ * @param  The type of field 12
+ * @param  The type of field 13
+ * @param  The type of field 14
+ * @param  The type of field 15
+ * @param  The type of field 16
+ * @param  The type of field 17
+ * @param  The type of field 18
+ * @param  The type of field 19
+ * @param  The type of field 20
+ * @param  The type of field 21
+ * @param  The type of field 22
+ * @param  The type of field 23
+ * @param  The type of field 24
+ */
+public class Tuple25 extends Tuple {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Field 0 of the tuple. */
+   public T0 f0;
+   /** Field 1 of the tuple. */
+   public T1 f1;
+   /** Field 2 of the tuple. */
+   public T2 f2;
+   /** Field 3 of the tuple. */
+   public T3 f3;
+   /** Field 4 of the tuple. */
+   public T4 f4;
+   /** Field 5 of the tuple. */
+   public T5 f5;
+   /** Field 6 of the tuple. */
+   public T6 f6;
+   /** Field 7 of the tuple. */
+   public T7 f7;
+   /** Field 8 of the tuple. */
+   public T8 f8;
+   /** Field 9 of the tuple. */
+   public T9 f9;
+   /** Field 10 of the tuple. */
+   public T10 f10;
+   /** Field 11 of the tuple. */
+   public T11 f11;
+   /** Field 12 of the tuple. */
+   public T12 f12;
+   /** Field 13 of the tuple. */
+   public T13 f13;
+   /** Field 14 of the tuple. */
+   public T14 f14;
+   /** Field 15 of the tuple. */
+   public T15 f15;
+   /** Field 16 of the tuple. */
+   public T16 f16;
+   /** Field 17 of the tuple. */
+   public T17 f17;
+   /** Field 18 of the tuple. */
+   public T18 f18;
+   /** Field 19 of the tuple. */
+   public T19 f19;
+   /** Field 20 of the tuple. */
+   public T20 f20;
+   /** Field 21 of the tuple. */
+   public T21 f21;
+   /** Field 22 of the tuple. */
+   public T22 f22;
+   /** Field 23 of the tuple. */
+   public T23 f23;
+   /** Field 24 of the tuple. */
+   public T24 f24;
+
+   /**
+* Creates a new tuple where all fields are null.
+*/
+   public Tuple25() {}
+
+   /**
+* Creates a new tuple and assigns the given values to the tuple's 
fields.
+*
+* @param 

[33/38] flink git commit: [FLINK-3049] [api breaking] Move 'Either' type to 'flink-core / org.apache.flink.types'

2016-02-02 Thread sewen
[FLINK-3049] [api breaking] Move 'Either' type to 'flink-core / 
org.apache.flink.types'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54743866
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54743866
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54743866

Branch: refs/heads/master
Commit: 54743866e86cbe7689ae1dcf001deb559629747b
Parents: 360f02b
Author: Stephan Ewen 
Authored: Sun Jan 31 23:27:36 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 16:55:44 2016 +0100

--
 .../java/org/apache/flink/types/Either.java | 185 +++
 1 file changed, 185 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/54743866/flink-core/src/main/java/org/apache/flink/types/Either.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java 
b/flink-core/src/main/java/org/apache/flink/types/Either.java
new file mode 100644
index 000..361802b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+/**
+ * This type represents a value of one two possible types, Left or Right (a
+ * disjoint union), inspired by Scala's Either type.
+ *
+ * @param 
+ *the type of Left
+ * @param 
+ *the type of Right
+ */
+public abstract class Either {
+
+   /**
+* Create a Left value of Either
+*/
+   public static  Either Left(L value) {
+   return new Left(value);
+   }
+
+   /**
+* Create a Right value of Either
+*/
+   public static  Either Right(R value) {
+   return new Right(value);
+   }
+
+   /**
+* Retrieve the Left value of Either.
+* 
+* @return the Left value
+* @throws IllegalStateException
+* if called on a Right
+*/
+   public abstract L left() throws IllegalStateException;
+
+   /**
+* Retrieve the Right value of Either.
+* 
+* @return the Right value
+* @throws IllegalStateException
+* if called on a Left
+*/
+   public abstract R right() throws IllegalStateException;
+
+   /**
+* 
+* @return true if this is a Left value, false if this is a Right value
+*/
+   public final boolean isLeft() {
+   return getClass() == Left.class;
+   }
+
+   /**
+* 
+* @return true if this is a Right value, false if this is a Left value
+*/
+   public final boolean isRight() {
+   return getClass() == Right.class;
+   }
+
+   /**
+* A left value of {@link Either}
+*
+* @param 
+*the type of Left
+* @param 
+*the type of Right
+*/
+   public static class Left extends Either {
+   private final L value;
+
+   public Left(L value) {
+   this.value = java.util.Objects.requireNonNull(value);
+   }
+
+   @Override
+   public L left() {
+   return value;
+   }
+
+   @Override
+   public R right() {
+   throw new IllegalStateException("Cannot retrieve Right 
value on a Left");
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object instanceof Left) {
+   final Left other = (Left) object;
+   return value.equals(other.value);
+   }
+   return false;
+   }
+
+   @Override
+   public int hashCode() {
+   return value.hashCode();
+ 

[06/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
deleted file mode 100644
index 7f5ef25..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.type.extractor;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Test;
-
-public class PojoTypeInformationTest {
-
-   public static class SimplePojo {
-   public String str;
-   public Boolean Bl;
-   public boolean bl;
-   public Byte Bt;
-   public byte bt;
-   public Short Shrt;
-   public short shrt;
-   public Integer Intgr;
-   public int intgr;
-   public Long Lng;
-   public long lng;
-   public Float Flt;
-   public float flt;
-   public Double Dbl;
-   public double dbl;
-   public Character Ch;
-   public char ch;
-   public int[] primIntArray;
-   public Integer[] intWrapperArray;
-   }
-
-   @Test
-   public void testSimplePojoTypeExtraction() {
-   TypeInformation type = 
TypeExtractor.getForClass(SimplePojo.class);
-   assertTrue("Extracted type is not a composite/pojo type but 
should be.", type instanceof CompositeType);
-   }
-
-   public static class NestedPojoInner {
-   public String field;
-   }
-
-   public static class NestedPojoOuter {
-   public Integer intField;
-   public NestedPojoInner inner;
-   }
-
-   @Test
-   public void testNestedPojoTypeExtraction() {
-   TypeInformation type = 
TypeExtractor.getForClass(NestedPojoOuter.class);
-   assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
-   }
-
-   public static class Recursive1Pojo {
-   public Integer intField;
-   public Recursive2Pojo rec;
-   }
-
-   public static class Recursive2Pojo {
-   public String strField;
-   public Recursive1Pojo rec;
-   }
-
-   @Test
-   public void testRecursivePojoTypeExtraction() {
-   // This one tests whether a recursive pojo is detected using 
the set of visited
-   // types in the type extractor. The recursive field will be 
handled using the generic serializer.
-   TypeInformation type = 
TypeExtractor.getForClass(Recursive1Pojo.class);
-   assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
-   }
-   
-   @Test
-   public void testRecursivePojoObjectTypeExtraction() {
-   TypeInformation type = 
TypeExtractor.getForObject(new Recursive1Pojo());
-   assertTrue("Extracted type is not a Pojo type but should be.", 
type instanceof CompositeType);
-   }
-   
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
deleted file mode 100644
index bbf5148..000
--- 

[26/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
deleted file mode 100644
index 70da5bb..000
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-// --
-//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
-//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
-// --
-
-
-package org.apache.flink.api.java.tuple;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A tuple with 17 fields. Tuples are strongly typed; each field may be of a 
separate type.
- * The fields of the tuple can be accessed directly as public fields (f0, f1, 
...) or via their position
- * through the {@link #getField(int)} method. The tuple field positions start 
at zero.
- * 
- * Tuples are mutable types, meaning that their fields can be re-assigned. 
This allows functions that work
- * with Tuples to reuse objects in order to reduce pressure on the garbage 
collector.
- *
- * @see Tuple
- *
- * @param  The type of field 0
- * @param  The type of field 1
- * @param  The type of field 2
- * @param  The type of field 3
- * @param  The type of field 4
- * @param  The type of field 5
- * @param  The type of field 6
- * @param  The type of field 7
- * @param  The type of field 8
- * @param  The type of field 9
- * @param  The type of field 10
- * @param  The type of field 11
- * @param  The type of field 12
- * @param  The type of field 13
- * @param  The type of field 14
- * @param  The type of field 15
- * @param  The type of field 16
- */
-public class Tuple17 extends Tuple {
-
-   private static final long serialVersionUID = 1L;
-
-   /** Field 0 of the tuple. */
-   public T0 f0;
-   /** Field 1 of the tuple. */
-   public T1 f1;
-   /** Field 2 of the tuple. */
-   public T2 f2;
-   /** Field 3 of the tuple. */
-   public T3 f3;
-   /** Field 4 of the tuple. */
-   public T4 f4;
-   /** Field 5 of the tuple. */
-   public T5 f5;
-   /** Field 6 of the tuple. */
-   public T6 f6;
-   /** Field 7 of the tuple. */
-   public T7 f7;
-   /** Field 8 of the tuple. */
-   public T8 f8;
-   /** Field 9 of the tuple. */
-   public T9 f9;
-   /** Field 10 of the tuple. */
-   public T10 f10;
-   /** Field 11 of the tuple. */
-   public T11 f11;
-   /** Field 12 of the tuple. */
-   public T12 f12;
-   /** Field 13 of the tuple. */
-   public T13 f13;
-   /** Field 14 of the tuple. */
-   public T14 f14;
-   /** Field 15 of the tuple. */
-   public T15 f15;
-   /** Field 16 of the tuple. */
-   public T16 f16;
-
-   /**
-* Creates a new tuple where all fields are null.
-*/
-   public Tuple17() {}
-
-   /**
-* Creates a new tuple and assigns the given values to the tuple's 
fields.
-*
-* @param value0 The value for field 0
-* @param value1 The value for field 1
-* @param value2 The value for field 2
-* @param value3 The value for field 3
-* @param value4 The value for field 4
-* @param value5 The value for field 5
-* @param value6 The value for field 6
-* @param value7 The value for field 7
-* @param value8 The value for field 8
-* @param value9 The value for field 9
-* @param value10 The value for field 10
-* @param value11 The value for field 11
-* @param value12 The value for field 12
-* @param value13 The value for field 13
-* @param value14 The value for field 14
-* @param value15 The value for field 15
-* @param value16 The value for field 16
-   

[36/38] flink git commit: [hotfix] Remove old sysout debug message in UdfAnalyzerTest

2016-02-02 Thread sewen
[hotfix] Remove old sysout debug message in UdfAnalyzerTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90542e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90542e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90542e8

Branch: refs/heads/master
Commit: b90542e80b2f9f5fb8ffe6e6325400b3cdd9301f
Parents: 168532e
Author: Stephan Ewen 
Authored: Mon Feb 1 21:46:09 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 17:11:50 2016 +0100

--
 .../test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java| 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b90542e8/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index a1ccfe4..dc2d1db 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -366,7 +366,6 @@ public class UdfAnalyzerTest {
 
@Test
public void testForwardWithGetMethod() {
-   System.out.println("HERE");

compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
"Tuple4", 
"Tuple4");
}



[37/38] flink git commit: [hotfix] Fix warnings concerning joda-time in "flink-tests"

2016-02-02 Thread sewen
[hotfix] Fix warnings concerning joda-time in "flink-tests"


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c356ef33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c356ef33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c356ef33

Branch: refs/heads/master
Commit: c356ef33bc0fdd77296061679699487ab16e8132
Parents: b90542e
Author: Stephan Ewen 
Authored: Tue Feb 2 12:10:01 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 17:11:50 2016 +0100

--
 flink-core/pom.xml  | 2 --
 flink-tests/pom.xml | 6 ++
 2 files changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c356ef33/flink-core/pom.xml
--
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ba1050c..694f629 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -94,14 +94,12 @@ under the License.

joda-time
joda-time
-   2.5
test

 

org.joda
joda-convert
-   1.7
test

 

http://git-wip-us.apache.org/repos/asf/flink/blob/c356ef33/flink-tests/pom.xml
--
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index fe34aea..996f5e0 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -164,6 +164,12 @@ under the License.

 

+   org.joda
+   joda-convert
+   test
+   
+
+   
com.google.guava
guava
${guava.version}



[20/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
new file mode 100644
index 000..b89a830
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -0,0 +1,383 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+   
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Value;
+
+
+public class TypeInfoParser {
+   private static final String TUPLE_PACKAGE = 
"org.apache.flink.api.java.tuple";
+   private static final String VALUE_PACKAGE = "org.apache.flink.types";
+   private static final String WRITABLE_PACKAGE = "org.apache.hadoop.io";
+
+   private static final Pattern tuplePattern = Pattern.compile("^(" + 
TUPLE_PACKAGE.replaceAll("\\.", ".") + 
"\\.)?((Tuple[1-9][0-9]?)<|(Tuple0))");
+   private static final Pattern writablePattern = Pattern.compile("^((" + 
WRITABLE_PACKAGE.replaceAll("\\.", ".") + 
"\\.)?Writable)<([^\\s,>]*)(,|>|$|\\[)");
+   private static final Pattern enumPattern = 
Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$|\\[)");
+   private static final Pattern basicTypePattern = Pattern
+   
.compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)");
+   private static final Pattern basicTypeDatePattern = 
Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)");
+   private static final Pattern primitiveTypePattern = 
Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)");
+   private static final Pattern valueTypePattern = Pattern.compile("^((" + 
VALUE_PACKAGE.replaceAll("\\.", ".")
+   + 
"\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)");
+   private static final Pattern pojoGenericObjectPattern = 
Pattern.compile("^([^\\s,<>\\[]+)(<)?");
+   private static final Pattern fieldPattern = 
Pattern.compile("^([^\\s,<>\\[]+)=");
+
+   /**
+* Generates an instance of TypeInformation by parsing a 
type
+* information string. A type information string can contain the 
following
+* types:
+*
+* 
+* Basic types such as Integer, String, 
etc.
+* Basic type arrays such as Integer[],
+* String[], etc.
+* Tuple types such as Tuple1TYPE0,
+* Tuple2TYPE0, TYPE1, etc.
+* Pojo types such as 
org.my.MyPojomyFieldName=TYPE0,myFieldName2=TYPE1, 
etc.
+* Generic types such as java.lang.Class, etc.
+* Custom type arrays such as org.my.CustomClass[],
+* org.my.CustomClass$StaticInnerClass[], etc.
+* Value types such as DoubleValue,
+* StringValue, IntegerValue, etc.
+* Tuple array types such as Tuple2TYPE0,TYPE1[], 
etc.
+* Writable types such as 
Writableorg.my.CustomWritable
+* Enum types such as 
Enumorg.my.CustomEnum
+* 
+*
+* Example:
+* 
"Tuple2String,Tuple2Integer,org.my.MyJob$Pojoword=String"
+*
+* @param infoString
+*type information string to be parsed
+* @return TypeInformation representation of the string
+*/
+   @SuppressWarnings("unchecked")
+   public static  TypeInformation parse(String infoString) {
+   try {
+   if (infoString == null) {
+   throw new 

[18/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
new file mode 100644
index 000..258d92c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+
+public class WritableSerializer extends TypeSerializer {
+   
+   private static final long serialVersionUID = 1L;
+   
+   private final Class typeClass;
+   
+   private transient Kryo kryo;
+   
+   private transient T copyInstance;
+   
+   public WritableSerializer(Class typeClass) {
+   this.typeClass = typeClass;
+   }
+   
+   @SuppressWarnings("unchecked")
+   @Override
+   public T createInstance() {
+   if(typeClass == NullWritable.class) {
+   return (T) NullWritable.get();
+   }
+   return InstantiationUtil.instantiate(typeClass);
+   }
+
+
+   
+   @Override
+   public T copy(T from) {
+   checkKryoInitialized();
+
+   return KryoUtils.copy(from, kryo, this);
+   }
+   
+   @Override
+   public T copy(T from, T reuse) {
+   checkKryoInitialized();
+
+   return KryoUtils.copy(from, reuse, kryo, this);
+   }
+   
+   @Override
+   public int getLength() {
+   return -1;
+   }
+   
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   record.write(target);
+   }
+   
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   return deserialize(createInstance(), source);
+   }
+   
+   @Override
+   public T deserialize(T reuse, DataInputView source) throws IOException {
+   reuse.readFields(source);
+   return reuse;
+   }
+   
+   @Override
+   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+   ensureInstanceInstantiated();
+   copyInstance.readFields(source);
+   copyInstance.write(target);
+   }
+   
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+   
+   @Override
+   public WritableSerializer duplicate() {
+   return new WritableSerializer(typeClass);
+   }
+   
+   // 

+   
+   private void ensureInstanceInstantiated() {
+   if (copyInstance == null) {
+   copyInstance = createInstance();
+   }
+   }
+   
+   private void checkKryoInitialized() {
+   if (this.kryo == null) {
+   this.kryo = new Kryo();
+
+   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+   kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+   this.kryo.setAsmEnabled(true);
+   this.kryo.register(typeClass);
+   }
+   }
+ 

[10/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
deleted file mode 100644
index b8f2075..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ /dev/null
@@ -1,1687 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A utility for reflection analysis on classes, to determine the return type 
of implementations of transformation
- * functions.
- */
-public class TypeExtractor {
-
-   /*
-* NOTE: Most methods of the TypeExtractor work with a so-called 
"typeHierarchy".
-* The type hierarchy describes all types (Classes, ParameterizedTypes, 
TypeVariables etc. ) and intermediate
-* types from a given type of a function or type (e.g. MyMapper, 
Tuple2) until a current type
-* (depends on the method, e.g. MyPojoFieldType).
-*
-* Thus, it fully qualifies types until tuple/POJO field level.
-*
-* A typical typeHierarchy could look like:
-*
-* UDF: MyMapFunction.class
-* top-level UDF: MyMapFunctionBase.class
-* RichMapFunction: RichMapFunction.class
-* MapFunction: MapFunction.class
-* Function's OUT: Tuple1
-* user-defined POJO: MyPojo.class
-* user-defined top-level POJO: MyPojoBase.class
-* POJO field: Tuple1
-* Field type: String.class
-*
-*/
-   
-   private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
-
-   protected TypeExtractor() {
-   // only create instances for special use cases
-   }
-
-   // 

-   //  Function specific methods
-   // 

[14/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
new file mode 100644
index 000..8c61a19
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link PojoSerializer}.
+ */
+public class PojoSubclassSerializerTest extends 
SerializerTestBase {
+   private TypeInformation type = 
TypeExtractor.getForClass(TestUserClassBase.class);
+
+   @Override
+   protected TypeSerializer createSerializer() {
+   // only register one of the three child classes, the third 
child class is NO POJO
+   ExecutionConfig conf = new ExecutionConfig();
+   conf.registerPojoType(TestUserClass1.class);
+   TypeSerializer serializer = 
type.createSerializer(conf);
+   assert(serializer instanceof PojoSerializer);
+   return serializer;
+   }
+
+   @Override
+   protected int getLength() {
+   return -1;
+   }
+
+   @Override
+   protected Class getTypeClass() {
+   return TestUserClassBase.class;
+   }
+
+   @Override
+   protected TestUserClassBase[] getTestData() {
+   Random rnd = new Random(874597969123412341L);
+
+   return new TestUserClassBase[]{
+   new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
+   new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat()),
+   new TestUserClass3(rnd.nextInt(), "bar", 
rnd.nextFloat())
+   };
+
+   }
+
+   @Override
+   @Test
+   public void testInstantiate() {
+   // don't do anything, since the PojoSerializer with subclass 
will return null
+   }
+
+   // User code class for testing the serializer
+   public static abstract class TestUserClassBase {
+   public int dumm1;
+   public String dumm2;
+
+
+   public TestUserClassBase() {
+   }
+
+   public TestUserClassBase(int dumm1, String dumm2) {
+   this.dumm1 = dumm1;
+   this.dumm2 = dumm2;
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hashCode(dumm1, dumm2);
+   }
+
+   @Override
+   public boolean equals(Object other) {
+   if (!(other instanceof TestUserClassBase)) {
+   return false;
+   }
+   TestUserClassBase otherTUC = (TestUserClassBase) other;
+   if (dumm1 != otherTUC.dumm1) {
+   return false;
+   }
+   if (!dumm2.equals(otherTUC.dumm2)) {
+   return false;
+   }
+   return true;
+   }
+   }
+
+   public static class TestUserClass1 extends TestUserClassBase {
+   public long dumm3;
+
+   public TestUserClass1() {
+   }
+
+   public TestUserClass1(int dumm1, String dumm2, long dumm3) {
+   super(dumm1, dumm2);
+ 

[34/38] flink git commit: [FLINK-2348] Fix unstable IterateExampleITCase.

2016-02-02 Thread sewen
[FLINK-2348] Fix unstable IterateExampleITCase.

This deactivates the validation of results, which is not reliably possible 
under the current model (timeout on feedback).
This test for now only checks that the job executes properly.

Also adds proper logging property files for the examples projects.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfff86c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfff86c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfff86c8

Branch: refs/heads/master
Commit: bfff86c841f0b873f25367419db0b3dd504a1197
Parents: 6f042e7
Author: Stephan Ewen 
Authored: Mon Feb 1 17:09:38 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 16:55:45 2016 +0100

--
 .../src/main/resources/log4j-test.properties| 23 
 .../src/main/resources/log4j.properties | 23 
 .../src/main/resources/logback.xml  | 29 
 .../iteration/IterateExampleITCase.java |  4 ++-
 .../src/test/resources/log4j-test.properties| 23 
 5 files changed, 78 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bfff86c8/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
--
diff --git 
a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties 
b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
deleted file mode 100644
index 65bd0b8..000
--- 
a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-log4j.rootLogger=INFO, console
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bfff86c8/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties
--
diff --git 
a/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties 
b/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties
new file mode 100644
index 000..da32ea0
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n


[02/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
--
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
deleted file mode 100644
index 8ff0b1b..000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
-import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("unchecked")
-public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-
-   ExecutionConfig ec = new ExecutionConfig();
-   
-   @Test
-   public void testJavaList(){
-   Collection a = new ArrayList<>();
-
-   fillCollection(a);
-
-   runTests(a);
-   }
-
-   @Test
-   public void testJavaSet(){
-   Collection b = new HashSet<>();
-
-   fillCollection(b);
-
-   runTests(b);
-   }
-
-
-
-   @Test
-   public void testJavaDequeue(){
-   Collection c = new LinkedList<>();
-   fillCollection(c);
-   runTests(c);
-   }
-
-   private void fillCollection(Collection coll) {
-   coll.add(42);
-   coll.add(1337);
-   coll.add(49);
-   coll.add(1);
-   }
-
-   @Override
-   protected  TypeSerializer createSerializer(Class type) {
-   return new KryoSerializer(type, ec);
-   }
-   
-   /**
-* Make sure that the kryo serializer forwards EOF exceptions properly 
when serializing
-*/
-   @Test
-   public void testForwardEOFExceptionWhileSerializing() {
-   try {
-   // construct a long string
-   String str;
-   {
-   char[] charData = new char[4];
-   Random rnd = new Random();
-   
-   for (int i = 0; i < charData.length; i++) {
-   charData[i] = (char) rnd.nextInt(1);
-   }
-   
-   str = new String(charData);
-   }
-   
-   // construct a memory target that is too small for the 
string
-   TestDataOutputSerializer target = new 
TestDataOutputSerializer(1, 3);
-   KryoSerializer serializer = new 
KryoSerializer(String.class, new ExecutionConfig());
-   
-   try {
-   serializer.serialize(str, target);
-   fail("should throw a java.io.EOFException");
-   }
-   catch (java.io.EOFException e) {
-   // that is how we like it
-   }
-   catch (Exception e) {
-   fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
-   }
-   }
-   catch (Exception e) {
-   

[17/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
new file mode 100644
index 000..bc11848
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultiset;
+
+/**
+ *  Pojo Type tests
+ *
+ *  A Pojo is a bean-style class with getters, setters and empty ctor
+ *   OR a class with all fields public (or for every private field, there has 
to be a public getter/setter)
+ *   everything else is a generic type (that can't be used for field selection)
+ */
+public class PojoTypeExtractionTest {
+
+   public static class HasDuplicateField extends WC {
+   @SuppressWarnings("unused")
+   private int count; // duplicate
+   }
+
+   @Test(expected=RuntimeException.class)
+   public void testDuplicateFieldException() {
+   TypeExtractor.createTypeInfo(HasDuplicateField.class);
+   }
+
+   // test with correct pojo types
+   public static class WC { // is a pojo
+   public ComplexNestedClass complex; // is a pojo
+   private int count; // is a BasicType
+
+   public WC() {
+   }
+   public int getCount() {
+   return count;
+   }
+   public void setCount(int c) {
+   this.count = c;
+   }
+   }
+   public static class ComplexNestedClass { // pojo
+   public static int ignoreStaticField;
+   public transient int ignoreTransientField;
+   public Date date; // generic type
+   public Integer someNumberWithÜnicödeNäme; // BasicType
+   public float someFloat; // BasicType
+   public Tuple3 word; //Tuple Type with three 
basic types
+   public Object nothing; // generic type
+   public MyWritable hadoopCitizen;  // writableType
+   public List collection;
+   }
+
+   // all public test
+   public static class AllPublic extends ComplexNestedClass {
+   public ArrayList somethingFancy; // generic type
+   public HashMultiset fancyIds; // generic type
+   public String[] fancyArray;  // generic type
+   }
+
+   public static class ParentSettingGenerics extends 
PojoWithGenerics {
+   public String field3;
+   }
+   public static class PojoWithGenerics {
+   public int key;
+   public T1 field1;
+   public T2 field2;
+   }
+
+   public static class ComplexHierarchyTop extends 
ComplexHierarchy {}
+   public static class ComplexHierarchy extends 
PojoWithGenerics {}
+
+   // extends from Tuple and adds a field
+   public static class FromTuple extends Tuple3 {
+   private static final long serialVersionUID = 1L;
+  

[25/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
deleted file mode 100644
index 5f7194b..000
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-// --
-//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
-//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
-// --
-
-
-package org.apache.flink.api.java.tuple;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A tuple with 22 fields. Tuples are strongly typed; each field may be of a 
separate type.
- * The fields of the tuple can be accessed directly as public fields (f0, f1, 
...) or via their position
- * through the {@link #getField(int)} method. The tuple field positions start 
at zero.
- * 
- * Tuples are mutable types, meaning that their fields can be re-assigned. 
This allows functions that work
- * with Tuples to reuse objects in order to reduce pressure on the garbage 
collector.
- *
- * @see Tuple
- *
- * @param  The type of field 0
- * @param  The type of field 1
- * @param  The type of field 2
- * @param  The type of field 3
- * @param  The type of field 4
- * @param  The type of field 5
- * @param  The type of field 6
- * @param  The type of field 7
- * @param  The type of field 8
- * @param  The type of field 9
- * @param  The type of field 10
- * @param  The type of field 11
- * @param  The type of field 12
- * @param  The type of field 13
- * @param  The type of field 14
- * @param  The type of field 15
- * @param  The type of field 16
- * @param  The type of field 17
- * @param  The type of field 18
- * @param  The type of field 19
- * @param  The type of field 20
- * @param  The type of field 21
- */
-public class Tuple22 extends Tuple {
-
-   private static final long serialVersionUID = 1L;
-
-   /** Field 0 of the tuple. */
-   public T0 f0;
-   /** Field 1 of the tuple. */
-   public T1 f1;
-   /** Field 2 of the tuple. */
-   public T2 f2;
-   /** Field 3 of the tuple. */
-   public T3 f3;
-   /** Field 4 of the tuple. */
-   public T4 f4;
-   /** Field 5 of the tuple. */
-   public T5 f5;
-   /** Field 6 of the tuple. */
-   public T6 f6;
-   /** Field 7 of the tuple. */
-   public T7 f7;
-   /** Field 8 of the tuple. */
-   public T8 f8;
-   /** Field 9 of the tuple. */
-   public T9 f9;
-   /** Field 10 of the tuple. */
-   public T10 f10;
-   /** Field 11 of the tuple. */
-   public T11 f11;
-   /** Field 12 of the tuple. */
-   public T12 f12;
-   /** Field 13 of the tuple. */
-   public T13 f13;
-   /** Field 14 of the tuple. */
-   public T14 f14;
-   /** Field 15 of the tuple. */
-   public T15 f15;
-   /** Field 16 of the tuple. */
-   public T16 f16;
-   /** Field 17 of the tuple. */
-   public T17 f17;
-   /** Field 18 of the tuple. */
-   public T18 f18;
-   /** Field 19 of the tuple. */
-   public T19 f19;
-   /** Field 20 of the tuple. */
-   public T20 f20;
-   /** Field 21 of the tuple. */
-   public T21 f21;
-
-   /**
-* Creates a new tuple where all fields are null.
-*/
-   public Tuple22() {}
-
-   /**
-* Creates a new tuple and assigns the given values to the tuple's 
fields.
-*
-* @param value0 The value for field 0
-* @param value1 The value for field 1
-* @param value2 The value for field 2
-* @param value3 The value for field 3
-* @param value4 The value for field 4
-* @param value5 The value for field 5
-* @param value6 The value 

[31/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
new file mode 100644
index 000..0d45352
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// --
+//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
+//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+// --
+
+
+package org.apache.flink.api.java.tuple;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A tuple with 15 fields. Tuples are strongly typed; each field may be of a 
separate type.
+ * The fields of the tuple can be accessed directly as public fields (f0, f1, 
...) or via their position
+ * through the {@link #getField(int)} method. The tuple field positions start 
at zero.
+ * 
+ * Tuples are mutable types, meaning that their fields can be re-assigned. 
This allows functions that work
+ * with Tuples to reuse objects in order to reduce pressure on the garbage 
collector.
+ *
+ * @see Tuple
+ *
+ * @param  The type of field 0
+ * @param  The type of field 1
+ * @param  The type of field 2
+ * @param  The type of field 3
+ * @param  The type of field 4
+ * @param  The type of field 5
+ * @param  The type of field 6
+ * @param  The type of field 7
+ * @param  The type of field 8
+ * @param  The type of field 9
+ * @param  The type of field 10
+ * @param  The type of field 11
+ * @param  The type of field 12
+ * @param  The type of field 13
+ * @param  The type of field 14
+ */
+public class Tuple15 extends Tuple {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Field 0 of the tuple. */
+   public T0 f0;
+   /** Field 1 of the tuple. */
+   public T1 f1;
+   /** Field 2 of the tuple. */
+   public T2 f2;
+   /** Field 3 of the tuple. */
+   public T3 f3;
+   /** Field 4 of the tuple. */
+   public T4 f4;
+   /** Field 5 of the tuple. */
+   public T5 f5;
+   /** Field 6 of the tuple. */
+   public T6 f6;
+   /** Field 7 of the tuple. */
+   public T7 f7;
+   /** Field 8 of the tuple. */
+   public T8 f8;
+   /** Field 9 of the tuple. */
+   public T9 f9;
+   /** Field 10 of the tuple. */
+   public T10 f10;
+   /** Field 11 of the tuple. */
+   public T11 f11;
+   /** Field 12 of the tuple. */
+   public T12 f12;
+   /** Field 13 of the tuple. */
+   public T13 f13;
+   /** Field 14 of the tuple. */
+   public T14 f14;
+
+   /**
+* Creates a new tuple where all fields are null.
+*/
+   public Tuple15() {}
+
+   /**
+* Creates a new tuple and assigns the given values to the tuple's 
fields.
+*
+* @param value0 The value for field 0
+* @param value1 The value for field 1
+* @param value2 The value for field 2
+* @param value3 The value for field 3
+* @param value4 The value for field 4
+* @param value5 The value for field 5
+* @param value6 The value for field 6
+* @param value7 The value for field 7
+* @param value8 The value for field 8
+* @param value9 The value for field 9
+* @param value10 The value for field 10
+* @param value11 The value for field 11
+* @param value12 The value for field 12
+* @param value13 The value for field 13
+* @param value14 The value for field 14
+*/
+   public Tuple15(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, 
T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 
value11, T12 value12, T13 value13, T14 value14) {
+   this.f0 = value0;
+   this.f1 = value1;
+   this.f2 

[15/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
new file mode 100644
index 000..4a579c8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class ValueTypeInfoTest extends TestLogger {
+
+   public static class TestClass implements Value {
+   private static final long serialVersionUID = 
-492760806806568285L;
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+
+   }
+
+   @Override
+   public void read(DataInputView in) throws IOException {
+
+   }
+   }
+
+   public static class AlternativeClass implements Value {
+
+   private static final long serialVersionUID = 
-163437084575260172L;
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+
+   }
+
+   @Override
+   public void read(DataInputView in) throws IOException {
+
+   }
+   }
+
+   @Test
+   public void testValueTypeInfoEquality() {
+   ValueTypeInfo tpeInfo1 = new 
ValueTypeInfo<>(TestClass.class);
+   ValueTypeInfo tpeInfo2 = new 
ValueTypeInfo<>(TestClass.class);
+
+   assertEquals(tpeInfo1, tpeInfo2);
+   assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+   }
+
+   @Test
+   public void testValueTyepInfoInequality() {
+   ValueTypeInfo tpeInfo1 = new 
ValueTypeInfo<>(TestClass.class);
+   ValueTypeInfo tpeInfo2 = new 
ValueTypeInfo<>(AlternativeClass.class);
+
+   assertNotEquals(tpeInfo1, tpeInfo2);
+   }
+
+   @Test
+   public void testValueTypeEqualsWithNull() throws Exception {
+   ValueTypeInfo tpeInfo = new 
ValueTypeInfo<>(Record.class);
+
+   Assert.assertFalse(tpeInfo.equals(null));
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 000..2ab0021
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import 

[11/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
deleted file mode 100644
index f7e4e42..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-
-import com.google.common.base.Joiner;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-/**
- * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
- * since the conditions are slightly different from Java Beans.
- * A type is considered a FLink POJO type, if it fulfills the conditions below.
- * 
- *   It is a public class, and standalone (not a non-static inner 
class)
- *   It has a public no-argument constructor.
- *   All fields are either public, or have public getters and setters.
- * 
- * 
- * @param  The type represented by this type information.
- */
-public class PojoTypeInfo extends CompositeType {
-   
-   private static final long serialVersionUID = 1L;
-
-   private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
-   private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
-   private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
-   +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
-   +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
-
-   private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
-   private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
-
-   private final PojoField[] fields;
-   
-   private final int totalFields;
-
-   public PojoTypeInfo(Class typeClass, List fields) {
-   super(typeClass);
-
-   
Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
-   "POJO " + typeClass + " is not public");
-
-   this.fields = fields.toArray(new PojoField[fields.size()]);
-
-   Arrays.sort(this.fields, new Comparator() {
-   @Override
-   public int compare(PojoField o1, PojoField o2) {
-   return 
o1.getField().getName().compareTo(o2.getField().getName());
-   }
-   });
-
-   int counterFields = 0;
-
-   for(PojoField field : fields) {
-   counterFields += 
field.getTypeInformation().getTotalFields();
-   }
-
-   totalFields = counterFields;
-   }
-
-   @Override
-   public boolean isBasicType() {
-   return false;
-   }
-
-
-   @Override
-   public boolean isTupleType() {
-   return false;
-   }
-
-   @Override
-   public int getArity() {
-   return fields.length;
-   }
-   
-   @Override
-   public int 

[12/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 0c0b710..c02d365 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
@@ -42,9 +43,9 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
 import org.apache.flink.api.java.operators.join.JoinType;
 import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
@@ -394,8 +395,8 @@ public abstract class JoinOperator extends 
TwoInputUdfOperator keys1 = 
(SelectorFunctionKeys)rawKeys1;
-   TypeInformation> typeInfoWithKey1 
= SelectorFunctionKeys.createTypeWithKey(keys1);
-   Operator> keyMapper1 = 
SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+   TypeInformation> typeInfoWithKey1 
= KeyFunctions.createTypeWithKey(keys1);
+   Operator> keyMapper1 = 
KeyFunctions.appendKeyExtractor(input1, keys1);
 
return this.withInput1(keyMapper1, 
typeInfoWithKey1, rawKeys1);
}
@@ -406,8 +407,8 @@ public abstract class JoinOperator extends 
TwoInputUdfOperator keys2 = 
(SelectorFunctionKeys)rawKeys2;
-   TypeInformation> typeInfoWithKey2 
= SelectorFunctionKeys.createTypeWithKey(keys2);
-   Operator> keyMapper2 = 
SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+   TypeInformation> typeInfoWithKey2 
= KeyFunctions.createTypeWithKey(keys2);
+   Operator> keyMapper2 = 
KeyFunctions.appendKeyExtractor(input2, keys2);
 
return withInput2(keyMapper2, typeInfoWithKey2, 
rawKeys2);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
new file mode 100644
index 000..49d598a
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is 

[35/38] flink git commit: [hotfix] [streaming] Various cleanups in StreamTask - Clean up generics - Clean and safe disposal of initialized resources - Add names to asynchronous materialization threads

2016-02-02 Thread sewen
[hotfix] [streaming] Various cleanups in StreamTask
  - Clean up generics
  - Clean and safe disposal of initialized resources
  - Add names to asynchronous materialization threads
  - Fix concurrent modification of  materialization threads set


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168532ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168532ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168532ec

Branch: refs/heads/master
Commit: 168532ece09b4be972f3ccad509a1e3376a1ac3a
Parents: bfff86c
Author: Stephan Ewen 
Authored: Thu Jan 28 18:58:11 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 17:11:46 2016 +0100

--
 .../streaming/runtime/tasks/StreamTask.java | 129 +--
 .../runtime/state/StateBackendITCase.java   |   1 -
 2 files changed, 63 insertions(+), 67 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e4b6b6e..9ab6c10 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -28,13 +30,11 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -140,14 +140,14 @@ public abstract class StreamTask
 * actual execution Thread. */
private volatile AsynchronousException asyncException;
 
-   protected Set asyncCheckpointThreads;
+   /** The currently active background materialization threads */
+   private final Set asyncCheckpointThreads = 
Collections.synchronizedSet(new HashSet());

/** Flag to mark the task "in operation", in which case check
 * needs to be initialized to true, so that early cancel() before 
invoke() behaves correctly */
private volatile boolean isRunning;
 
private long recoveryTimestamp;
-   
 
// 

//  Life cycle methods for specific implementations
@@ -167,21 +167,19 @@ public abstract class StreamTask

@Override
public final void invoke() throws Exception {
-   // 

-   // Initialize
-   // 

-   LOG.debug("Initializing {}", getName());
 
-   boolean initializationCompleted = false;
+   boolean disposed = false;
try {
-   AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
+   //  Initialize -
+   LOG.debug("Initializing {}", getName());
 
userClassLoader = getUserCodeClassLoader();
configuration = new 
StreamConfig(getTaskConfiguration());
-   accumulatorMap = accumulatorRegistry.getUserMap();
+   accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
 
headOperator = 
configuration.getStreamOperator(userClassLoader);
-   operatorChain = new OperatorChain<>(this, headOperator, 

[21/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
new file mode 100644
index 000..30710e5
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer;
+//CHECKSTYLE.ON: AvoidStarImport
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.types.Value;
+
+/**
+ * A {@link TypeInformation} for the tuple types of the Java API.
+ *
+ * @param  The type of the tuple.
+ */
+public final class TupleTypeInfo extends TupleTypeInfoBase 
{
+   
+   private static final long serialVersionUID = 1L;
+
+   protected final String[] fieldNames;
+
+   @SuppressWarnings("unchecked")
+   public TupleTypeInfo(TypeInformation... types) {
+   this((Class) Tuple.getTupleClass(types.length), types);
+   }
+
+   public TupleTypeInfo(Class tupleType, TypeInformation... types) {
+   super(tupleType, types);
+
+   Preconditions.checkArgument(
+   types.length <= Tuple.MAX_ARITY,
+   "The tuple type exceeds the maximum supported arity.");
+
+   this.fieldNames = new String[types.length];
+
+   for (int i = 0; i < types.length; i++) {
+   fieldNames[i] = "f" + i;
+   }
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public int getFieldIndex(String fieldName) {
+   int fieldIndex = Integer.parseInt(fieldName.substring(1));
+   if (fieldIndex >= getArity()) {
+   return -1;
+   }
+   return fieldIndex;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public TupleSerializer createSerializer(ExecutionConfig 
executionConfig) {
+   if (getTypeClass() == Tuple0.class) {
+   return (TupleSerializer) Tuple0Serializer.INSTANCE;
+   }
+
+   TypeSerializer[] fieldSerializers = new 
TypeSerializer[getArity()];
+   for (int i = 0; i < types.length; i++) {
+   fieldSerializers[i] = 
types[i].createSerializer(executionConfig);
+   }
+   
+   Class tupleClass = getTypeClass();
+   
+   return new TupleSerializer(tupleClass, fieldSerializers);
+   }
+
+   @Override
+   protected TypeComparatorBuilder createTypeComparatorBuilder() {
+   return new TupleTypeComparatorBuilder();
+   }
+
+   private class TupleTypeComparatorBuilder implements 
TypeComparatorBuilder {
+
+   private final ArrayList fieldComparators = new 
ArrayList();
+   private final ArrayList logicalKeyFields = new 
ArrayList();
+
+   @Override
+   public void initializeTypeComparatorBuilder(int size) {
+   

[19/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
new file mode 100644
index 000..c0c7797
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.InstantiationUtil;
+
+
+public final class PojoComparator extends CompositeTypeComparator 
implements java.io.Serializable {
+   
+   private static final long serialVersionUID = 1L;
+
+   // Reflection fields for the comp fields
+   private transient Field[] keyFields;
+
+   private final TypeComparator[] comparators;
+
+   private final int[] normalizedKeyLengths;
+
+   private final int numLeadingNormalizableKeys;
+
+   private final int normalizableKeyPrefixLen;
+
+   private final boolean invertNormKey;
+
+   private TypeSerializer serializer;
+
+   private final Class type;
+
+   @SuppressWarnings("unchecked")
+   public PojoComparator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer, Class type) {
+   this.keyFields = keyFields;
+   this.comparators = (TypeComparator[]) comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+
+   // set up auxiliary fields for normalized key support
+   this.normalizedKeyLengths = new int[keyFields.length];
+   int nKeys = 0;
+   int nKeyLen = 0;
+   boolean inverted = false;
+
+   for (int i = 0; i < this.comparators.length; i++) {
+   TypeComparator k = this.comparators[i];
+   if(k == null) {
+   throw new IllegalArgumentException("One of the 
passed comparators is null");
+   }
+   if(keyFields[i] == null) {
+   throw new IllegalArgumentException("One of the 
passed reflection fields is null");
+   }
+
+   // as long as the leading keys support normalized keys, 
we can build up the composite key
+   if (k.supportsNormalizedKey()) {
+   if (i == 0) {
+   // the first comparator decides whether 
we need to invert the key direction
+   inverted = k.invertNormalizedKey();
+   }
+   else if (k.invertNormalizedKey() != inverted) {
+   // if a successor does not agree on the 
invertion direction, it cannot be part of the normalized key
+   break;
+   }
+
+   nKeys++;
+   final int len = k.getNormalizeKeyLen();
+   if (len < 0) {
+   throw new RuntimeException("Comparator 
" + k.getClass().getName() + " specifies an invalid length for the normalized 
key: " + len);
+   }
+   this.normalizedKeyLengths[i] = len;
+ 

[32/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
[FLINK-3303] [core] Move Tuple classes to flink-core


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7081836e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7081836e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7081836e

Branch: refs/heads/master
Commit: 7081836e0d640ee640687606bd73b6673d3f2a07
Parents: 5474386
Author: Stephan Ewen 
Authored: Sun Jan 31 23:08:18 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 2 16:55:44 2016 +0100

--
 .../org/apache/flink/api/java/tuple/Tuple.java  | 118 +++
 .../org/apache/flink/api/java/tuple/Tuple0.java |  98 +++
 .../org/apache/flink/api/java/tuple/Tuple1.java | 154 
 .../apache/flink/api/java/tuple/Tuple10.java| 289 ++
 .../apache/flink/api/java/tuple/Tuple11.java| 304 +++
 .../apache/flink/api/java/tuple/Tuple12.java| 319 +++
 .../apache/flink/api/java/tuple/Tuple13.java| 334 +++
 .../apache/flink/api/java/tuple/Tuple14.java| 349 
 .../apache/flink/api/java/tuple/Tuple15.java| 364 
 .../apache/flink/api/java/tuple/Tuple16.java| 379 
 .../apache/flink/api/java/tuple/Tuple17.java| 394 +
 .../apache/flink/api/java/tuple/Tuple18.java| 409 +
 .../apache/flink/api/java/tuple/Tuple19.java| 424 +
 .../org/apache/flink/api/java/tuple/Tuple2.java | 177 
 .../apache/flink/api/java/tuple/Tuple20.java| 439 +
 .../apache/flink/api/java/tuple/Tuple21.java| 454 ++
 .../apache/flink/api/java/tuple/Tuple22.java| 469 ++
 .../apache/flink/api/java/tuple/Tuple23.java| 484 ++
 .../apache/flink/api/java/tuple/Tuple24.java| 499 +++
 .../apache/flink/api/java/tuple/Tuple25.java| 514 +++
 .../org/apache/flink/api/java/tuple/Tuple3.java | 184 
 .../org/apache/flink/api/java/tuple/Tuple4.java | 199 +
 .../org/apache/flink/api/java/tuple/Tuple5.java | 214 +
 .../org/apache/flink/api/java/tuple/Tuple6.java | 229 +
 .../org/apache/flink/api/java/tuple/Tuple7.java | 244 +
 .../org/apache/flink/api/java/tuple/Tuple8.java | 259 ++
 .../org/apache/flink/api/java/tuple/Tuple9.java | 274 ++
 .../api/java/tuple/builder/Tuple0Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple10Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple11Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple12Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple13Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple14Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple15Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple16Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple17Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple18Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple19Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple1Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple20Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple21Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple22Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple23Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple24Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple25Builder.java  |  46 +
 .../api/java/tuple/builder/Tuple2Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple3Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple4Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple5Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple6Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple7Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple8Builder.java   |  46 +
 .../api/java/tuple/builder/Tuple9Builder.java   |  46 +
 .../flink/api/java/tuple/TupleGenerator.java| 521 +++
 .../org/apache/flink/api/java/tuple/Tuple.java  | 118 ---
 .../org/apache/flink/api/java/tuple/Tuple0.java |  98 ---
 .../org/apache/flink/api/java/tuple/Tuple1.java | 154 
 .../apache/flink/api/java/tuple/Tuple10.java| 289 --
 .../apache/flink/api/java/tuple/Tuple11.java| 304 ---
 .../apache/flink/api/java/tuple/Tuple12.java| 319 ---
 .../apache/flink/api/java/tuple/Tuple13.java| 334 ---
 .../apache/flink/api/java/tuple/Tuple14.java| 349 
 .../apache/flink/api/java/tuple/Tuple15.java| 364 
 .../apache/flink/api/java/tuple/Tuple16.java| 379 
 .../apache/flink/api/java/tuple/Tuple17.java| 394 -
 .../apache/flink/api/java/tuple/Tuple18.java| 409 -
 .../apache/flink/api/java/tuple/Tuple19.java| 424 -
 .../org/apache/flink/api/java/tuple/Tuple2.java | 177 
 .../apache/flink/api/java/tuple/Tuple20.java| 439 -
 .../apache/flink/api/java/tuple/Tuple21.java| 454 --
 .../apache/flink/api/java/tuple/Tuple22.java| 469 --
 

[07/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
deleted file mode 100644
index 276ffc4..000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
-import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * A type serializer that serializes its type using the Kryo serialization
- * framework (https://github.com/EsotericSoftware/kryo).
- * 
- * This serializer is intended as a fallback serializer for the cases that are
- * not covered by the basic types, tuples, and POJOs.
- *
- * @param  The type to be serialized.
- */
-public class KryoSerializer extends TypeSerializer {
-
-   private static final long serialVersionUID = 3L;
-
-   private static final Logger LOG = 
LoggerFactory.getLogger(KryoSerializer.class);
-
-   // 

-
-   private final LinkedHashMap registeredTypesWithSerializers;
-   private final LinkedHashMap> 
registeredTypesWithSerializerClasses;
-   private final LinkedHashMap defaultSerializers;
-   private final LinkedHashMap> 
defaultSerializerClasses;
-   private final LinkedHashSet registeredTypes;
-
-   private final Class type;
-   
-   // 

-   // The fields below are lazily initialized after duplication or 
deserialization.
-
-   private transient Kryo kryo;
-   private transient T copyInstance;
-   
-   private transient DataOutputView previousOut;
-   private transient DataInputView previousIn;
-   
-   private transient Input input;
-   private transient Output output;
-
-   // 

-
-   public KryoSerializer(Class type, ExecutionConfig executionConfig){
-   this.type = Preconditions.checkNotNull(type);
-
-   this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
-   this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
-

[28/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core

2016-02-02 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
new file mode 100644
index 000..2ab1d0a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// --
+//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
+//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+// --
+
+
+package org.apache.flink.api.java.tuple.builder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple15;
+
+public class Tuple15Builder {
+
+   private List> tuples = new ArrayList<>();
+
+   public Tuple15Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 
value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, 
T12 value12, T13 value13, T14 value14){
+   tuples.add(new Tuple15<>(value0, value1, value2, value3, 
value4, value5, value6, value7, value8, value9, value10, value11, value12, 
value13, value14));
+   return this;
+   }
+
+   @SuppressWarnings("unchecked")
+   public Tuple15[] build(){
+   return tuples.toArray(new Tuple15[tuples.size()]);
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
new file mode 100644
index 000..5b4fba2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// --
+//  THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!
+//  GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+// --
+
+
+package org.apache.flink.api.java.tuple.builder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple16;
+
+public class Tuple16Builder {
+
+   private List> tuples = new ArrayList<>();
+
+   public Tuple16Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, 
T5 value5, T6 value6, T7 value7, T8 value8, 

flink git commit: [FLINK-3175][Kafka 0.8] Relax test condition

2016-02-02 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master f8677464b -> 682d8d5e2


[FLINK-3175][Kafka 0.8] Relax test condition


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/682d8d5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/682d8d5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/682d8d5e

Branch: refs/heads/master
Commit: 682d8d5e2ce9758ae67276f4584dd8ec20f4e8ad
Parents: f867746
Author: Robert Metzger 
Authored: Tue Feb 2 11:32:30 2016 +0100
Committer: Robert Metzger 
Committed: Tue Feb 2 11:32:30 2016 +0100

--
 .../flink/streaming/connectors/kafka/Kafka08ITCase.java   | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/682d8d5e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 26e31f5..6a2fa27 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -257,9 +257,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
// ensure that the offset has been committed
-   assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 
&& o1 <= 100);
-   assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 
&& o2 <= 100);
-   assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 
&& o3 <= 100);
+   boolean atLeastOneOffsetSet = (o1 > 0 && o1 <= 100) ||
+   (o2 > 0 && o2 <= 100) ||
+   (o3 > 0 && o3 <= 100);
+   assertTrue("Expecting at least one offset to be set o1="+o1+" 
o2="+o2+" o3="+o3, atLeastOneOffsetSet);
+   //assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 
&& o1 <= 100);
+   //assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 
&& o2 <= 100);
+   //assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 
&& o3 <= 100);
 
deleteTestTopic(topicName);
}



[3/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition

2016-02-02 Thread trohrmann
[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition

Implements NFA using the SharedBuffer

Implements NFACompiler to compile a Pattern into a NFA

Add CEP operator

Makes NFA and SharedBuffer serializable

Add serializability support to SharedBuffer and NFA

Add keyed cep pattern operator

Adds CEP documentation

Adds online documentation for the CEP library

Copies sequence events before giving them to the UDF

Fix correct scala type suffixes

This closes #1557.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79058edb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79058edb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79058edb

Branch: refs/heads/master
Commit: 79058edb67095120558add534ba37304425fa602
Parents: 682d8d5
Author: Till Rohrmann 
Authored: Thu Jan 14 10:04:23 2016 +0100
Committer: Till Rohrmann 
Committed: Tue Feb 2 15:04:08 2016 +0100

--
 docs/libs/cep/index.md  | 300 +++
 flink-libraries/flink-cep/pom.xml   |  77 ++
 .../src/main/java/org/apache/flink/cep/CEP.java | 100 +++
 .../flink/cep/NonDuplicatingTypeSerializer.java | 195 +
 .../flink/cep/PatternFlatSelectFunction.java|  54 ++
 .../apache/flink/cep/PatternSelectFunction.java |  54 ++
 .../org/apache/flink/cep/PatternStream.java | 151 
 .../apache/flink/cep/nfa/ComputationState.java  |  84 ++
 .../org/apache/flink/cep/nfa/DeweyNumber.java   | 163 
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 406 +
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 858 +++
 .../java/org/apache/flink/cep/nfa/State.java| 109 +++
 .../apache/flink/cep/nfa/StateTransition.java   |  84 ++
 .../flink/cep/nfa/StateTransitionAction.java|  28 +
 .../flink/cep/nfa/compiler/NFACompiler.java | 187 
 .../operator/AbstractCEPPatternOperator.java| 108 +++
 .../flink/cep/operator/CEPPatternOperator.java  | 137 +++
 .../cep/operator/KeyedCEPPatternOperator.java   | 331 +++
 .../cep/operator/StreamRecordComparator.java|  44 +
 .../flink/cep/pattern/AndFilterFunction.java|  44 +
 .../flink/cep/pattern/FollowedByPattern.java|  33 +
 .../org/apache/flink/cep/pattern/Pattern.java   | 168 
 .../cep/pattern/SubtypeFilterFunction.java  |  43 +
 .../java/org/apache/flink/cep/CEPITCase.java| 406 +
 .../test/java/org/apache/flink/cep/Event.java   |  77 ++
 .../java/org/apache/flink/cep/StreamEvent.java  |  41 +
 .../java/org/apache/flink/cep/SubEvent.java |  49 ++
 .../apache/flink/cep/nfa/DeweyNumberTest.java   |  54 ++
 .../org/apache/flink/cep/nfa/NFAITCase.java | 160 
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 261 ++
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 136 +++
 .../flink/cep/nfa/compiler/NFACompilerTest.java | 129 +++
 .../apache/flink/cep/pattern/PatternTest.java   | 145 
 flink-libraries/pom.xml |   1 +
 .../streaming/api/operators/StreamOperator.java |   2 +-
 35 files changed, 5218 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/docs/libs/cep/index.md
--
diff --git a/docs/libs/cep/index.md b/docs/libs/cep/index.md
new file mode 100644
index 000..04e2b73
--- /dev/null
+++ b/docs/libs/cep/index.md
@@ -0,0 +1,300 @@
+---
+title: "FlinkCEP - Complex event processing for Flink"
+# Top navigation
+top-nav-group: libs
+top-nav-pos: 2
+top-nav-title: CEP
+# Sub navigation
+sub-nav-group: batch
+sub-nav-id: flinkcep
+sub-nav-pos: 2
+sub-nav-parent: libs
+sub-nav-title: CEP
+---
+
+
+FlinkCEP is the complex event processing library for Flink.
+It allows you to easily detect complex event patterns in a stream of endless 
data.
+Complex events can then be constructed from matching sequences.
+This gives you the opportunity to quickly get hold of what's really important 
in your data.
+
+## Getting Started
+
+If you want to jump right in, you have to [set up a Flink program]({{ 
site.baseurl }}/apis/batch/index.html#linking-with-flink).
+Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-cep{{ site.scala_version_suffix }}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that FlinkCEP is currently not part of the binary distribution.
+See linking with it for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+Now you can start writing your first CEP program using the pattern API.
+
+{% highlight java %}
+DataStream input = ...
+
+Pattern pattern = Pattern.begin("start").where(evt -> evt.getId() 

[2/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition

2016-02-02 Thread trohrmann
http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
new file mode 100644
index 000..e1c0099
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -0,0 +1,858 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * A shared buffer implementation which stores values under a key. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ * 
+ * The idea of the implementation is to have for each key a dedicated {@link 
SharedBufferPage}. Each
+ * buffer page maintains a collection of the inserted values.
+ *
+ * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer 
entry allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ *
+ * @param  Type of the keys
+ * @param  Type of the values
+ */
+public class SharedBuffer implements Serializable {
+   private static final long serialVersionUID = 9213251042562206495L;
+
+   private final TypeSerializer valueSerializer;
+
+   private transient Map> pages;
+
+   public SharedBuffer(final TypeSerializer valueSerializer) {
+   this.valueSerializer = valueSerializer;
+   pages = new HashMap<>();
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given key. It 
assigns a preceding element
+* relation to the entry which is defined by the previous key, value 
(value + timestamp).
+*
+* @param key Key of the current value
+* @param value Current value
+* @param timestamp Timestamp of the current value (a value requires 
always a timestamp to make it uniquely referable))
+* @param previousKey Key of the value for the previous relation
+* @param previousValue Value for the previous relation
+* @param previousTimestamp Timestamp of the value for the previous 
relation
+* @param version Version of the previous relation
+*/
+   public void put(
+   final K key,
+   final V value,
+   final long timestamp,
+   final K previousKey,
+   final V previousValue,
+   final long previousTimestamp,
+   final DeweyNumber version) {
+   SharedBufferPage page;
+
+   if (!pages.containsKey(key)) {
+   page = new SharedBufferPage(key);
+   pages.put(key, page);
+   } else {
+   page = pages.get(key);
+   }
+
+   final SharedBufferEntry previousSharedBufferEntry = 
get(previousKey, previousValue, previousTimestamp);
+
+   page.add(

[1/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition

2016-02-02 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 682d8d5e2 -> 79058edb6


http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
new file mode 100644
index 000..7dcda4c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Map;
+
+public class CEPITCase extends StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   }
+
+   /**
+* Checks that a certain event sequence is recognized
+* @throws Exception
+*/
+   @Test
+   public void testSimplePatternCEP() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "barfoo", 1.0),
+   new Event(2, "start", 2.0),
+   new Event(3, "foobar", 3.0),
+   new SubEvent(4, "foo", 4.0, 1.0),
+   new Event(5, "middle", 5.0),
+   new SubEvent(6, "middle", 6.0, 2.0),
+   new SubEvent(7, "bar", 3.0, 3.0),
+   new Event(42, "42", 42.0),
+   new Event(8, "end", 1.0)
+   );
+
+   Pattern pattern = 
Pattern.begin("start").where(new FilterFunction() {
+   private static final long serialVersionUID = 
5681493970790509488L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("start");
+   }
+   })
+   .followedBy("middle").subtype(SubEvent.class).where(
+   new FilterFunction() {
+   private static final long 
serialVersionUID = 448591738315698540L;
+
+   @Override
+   public boolean filter(SubEvent value) 
throws Exception {
+   return 
value.getName().equals("middle");
+   }
+   }
+   )
+   .followedBy("end").where(new FilterFunction() {
+   private static final long serialVersionUID = 
6080276591060431966L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+