[1/2] flink git commit: [FLINK-3247] Remove * exclude from quickstarts
Repository: flink Updated Branches: refs/heads/master 79058edb6 -> 360f02b1f [FLINK-3247] Remove * exclude from quickstarts This closes #1573 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/360f02b1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/360f02b1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/360f02b1 Branch: refs/heads/master Commit: 360f02b1f1f5ae4e9c3b3c43a9af033b66ecdcef Parents: cb1a5ec Author: Robert MetzgerAuthored: Tue Feb 2 12:04:26 2016 +0100 Committer: Robert Metzger Committed: Tue Feb 2 15:38:46 2016 +0100 -- .../src/main/resources/archetype-resources/pom.xml| 7 +-- .../src/main/resources/archetype-resources/pom.xml| 7 +-- 2 files changed, 10 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/360f02b1/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index eb8bee1..bd1e723 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -112,7 +112,9 @@ under the License. Everything else will be packaged into the fat-jar --> org.apache.flink:flink-annotations - org.apache.flink:flink-shaded-* + org.apache.flink:flink-shaded-hadoop1 + org.apache.flink:flink-shaded-hadoop2 + org.apache.flink:flink-shaded-curator-recipes org.apache.flink:flink-core org.apache.flink:flink-java org.apache.flink:flink-scala_2.10 @@ -184,7 +186,8 @@ under the License. org.apache.flink:* - org/apache/flink/shaded/** + + org/apache/flink/shaded/com/** web-docs/** http://git-wip-us.apache.org/repos/asf/flink/blob/360f02b1/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index e575181..df98b95 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -116,7 +116,9 @@ under the License. Everything else will be packaged into the fat-jar --> org.apache.flink:flink-annotations - org.apache.flink:flink-shaded-* + org.apache.flink:flink-shaded-hadoop1 + org.apache.flink:flink-shaded-hadoop2 + org.apache.flink:flink-shaded-curator-recipes
[2/2] flink git commit: [FLINK-3287] Shade Curator dependency into flink-connector-kafka-0.8
[FLINK-3287] Shade Curator dependency into flink-connector-kafka-0.8 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb1a5ecb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb1a5ecb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb1a5ecb Branch: refs/heads/master Commit: cb1a5ecbca9b8df924968904d87b6ff4ac4d7be7 Parents: 79058ed Author: Robert MetzgerAuthored: Thu Jan 28 15:16:16 2016 +0100 Committer: Robert Metzger Committed: Tue Feb 2 15:38:46 2016 +0100 -- .../flink-connector-kafka-0.8/pom.xml | 27 1 file changed, 27 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/cb1a5ecb/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml index 0b23994..3672c81 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -154,6 +154,33 @@ under the License. 1 + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-shaded-curator-recipes + + + + + org.apache.curator + org.apache.flink.shaded.org.apache.curator + + + + + +
flink-web git commit: [FLINK-3316] Fix broken links in first paragraph of landing page
Repository: flink-web Updated Branches: refs/heads/asf-site b68e1f48b -> ffe31bf53 [FLINK-3316] Fix broken links in first paragraph of landing page Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/ffe31bf5 Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/ffe31bf5 Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/ffe31bf5 Branch: refs/heads/asf-site Commit: ffe31bf53ca28a842d4a352a568f288251d56490 Parents: b68e1f4 Author: Till RohrmannAuthored: Tue Feb 2 16:19:53 2016 +0100 Committer: Till Rohrmann Committed: Tue Feb 2 16:19:53 2016 +0100 -- index.md | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink-web/blob/ffe31bf5/index.md -- diff --git a/index.md b/index.md index 5ff5583..c675b4c 100644 --- a/index.md +++ b/index.md @@ -10,18 +10,18 @@ layout: base -**Flinkâs core** is a [streaming dataflow engine](features.html#unified-stream-amp-batch-processing) that provides data distribution, communication, and fault tolerance for distributed computations over data streams. +**Flinkâs core** is a [streaming dataflow engine](features.html) that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink includes **several APIs** for creating applications that use the Flink engine: -1. [DataSet API](features.html#dataset-api) for static data embedded in Java, Scala, and Python, -2. [DataStream API](features.html#datastream-api) for unbounded streams embedded in Java and Scala, and -3. [Table API](features.html#table-api) with a SQL-like expression language embedded in Java and Scala. +1. [DataStream API]({{ site.docs-snapshot }}/apis/streaming/index.html) for unbounded streams embedded in Java and Scala, and +2. [DataSet API]({{ site.docs-snapshot }}/apis/batch/index.html) for static data embedded in Java, Scala, and Python, +3. [Table API]({{ site.docs-snapshot }}/libs/table.html) with a SQL-like expression language embedded in Java and Scala. Flink also bundles **libraries for domain-specific use cases**: -1. [Machine Learning library](features.html#machine-learning-library), and -2. [Gelly](features.html#graph-api-amp-library-gelly), a graph processing API and library. +1. [Machine Learning library]({{ site.docs-snapshot }}/libs/ml/index.html), and +2. [Gelly]({{ site.docs-snapshot }}/libs/gelly_guide.html), a graph processing API and library. You can **integrate** Flink easily with other well-known open source systems both for [data input and output](features.html#deployment-and-integration) as well as [deployment](features.html#deployment-and-integration).
[13/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java new file mode 100644 index 000..a196984 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; + +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.junit.Assert; + +public class TupleSerializerTestInstance extends SerializerTestInstance { + + public TupleSerializerTestInstance(TypeSerializer serializer, Class typeClass, int length, T[] testData) { + super(serializer, typeClass, length, testData); + } + + protected void deepEquals(String message, T shouldTuple, T isTuple) { + Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity()); + + for (int i = 0; i < shouldTuple.getArity(); i++) { + Object should = shouldTuple.getField(i); + Object is = isTuple.getField(i); + + if (should.getClass().isArray()) { + if (should instanceof boolean[]) { + Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is)); + } + else if (should instanceof byte[]) { + assertArrayEquals(message, (byte[]) should, (byte[]) is); + } + else if (should instanceof short[]) { + assertArrayEquals(message, (short[]) should, (short[]) is); + } + else if (should instanceof int[]) { + assertArrayEquals(message, (int[]) should, (int[]) is); + } + else if (should instanceof long[]) { + assertArrayEquals(message, (long[]) should, (long[]) is); + } + else if (should instanceof float[]) { + assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f); + } + else if (should instanceof double[]) { + assertArrayEquals(message, (double[]) should, (double[]) is, 0.0); + } + else if (should instanceof char[]) { + assertArrayEquals(message, (char[]) should, (char[]) is); + } + else { + assertArrayEquals(message, (Object[]) should, (Object[]) is); + } + } + else { + assertEquals(message, should, is); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
[01/38] flink git commit: [hotfix] Reduce the heavy sysout verbosity for certain tests
Repository: flink Updated Branches: refs/heads/master 360f02b1f -> 8fc7e7af2 [hotfix] Reduce the heavy sysout verbosity for certain tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f042e78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f042e78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f042e78 Branch: refs/heads/master Commit: 6f042e7894be388fa8e400a08002584c10781e60 Parents: 21a7158 Author: Stephan EwenAuthored: Mon Feb 1 16:46:03 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 16:55:44 2016 +0100 -- .../jar/CheckpointedStreamingProgram.java | 10 -- .../flink/test/recovery/FastFailuresITCase.java | 18 +++--- 2 files changed, 15 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java index 47253da..cda5a7b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java @@ -27,8 +27,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.lang.RuntimeException; -import java.net.URL; -import java.net.URLClassLoader; /** * A simple streaming program, which is using the state checkpointing of Flink. @@ -40,14 +38,6 @@ public class CheckpointedStreamingProgram { private static final int CHECKPOINT_INTERVALL = 100; public static void main(String[] args) throws Exception { - ClassLoader cl = ClassLoader.getSystemClassLoader(); - URL[] urls = ((URLClassLoader)cl).getURLs(); - - for(URL url: urls){ - System.out.println(url.getFile()); - } - System.out.println("CheckpointedStreamingProgram classpath: "); - final String jarFile = args[0]; final String host = args[1]; final int port = Integer.parseInt(args[2]); http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java index 0684fde..2a139c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java @@ -21,11 +21,13 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.Test; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.fail; +@SuppressWarnings("serial") public class FastFailuresITCase { static final AtomicInteger FAILURES_SO_FAR = new AtomicInteger(); @@ -40,12 +43,21 @@ public class FastFailuresITCase { @Test public void testThis() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + + ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + +
[05/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java deleted file mode 100644 index cd405bc..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.junit.Test; - -public class CompositeTypeTest { - - private final TupleTypeInfo tupleTypeInfo = new TupleTypeInfo>( - BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - - - private final TupleTypeInfo > inNestedTuple1 = new TupleTypeInfo >( - BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); - - private final TupleTypeInfo > inNestedTuple2 = new TupleTypeInfo >( - BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO); - - private final TupleTypeInfo nestedTypeInfo = new TupleTypeInfo , Integer, Tuple2 >>( - BasicTypeInfo.INT_TYPE_INFO, - inNestedTuple1, - BasicTypeInfo.INT_TYPE_INFO, - inNestedTuple2); - - private final TupleTypeInfo >> inNestedTuple3 = new TupleTypeInfo >>( - BasicTypeInfo.INT_TYPE_INFO, - new TupleTypeInfo >(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); - - private final TupleTypeInfo deepNestedTupleTypeInfo = new TupleTypeInfo >, Integer>>( - BasicTypeInfo.INT_TYPE_INFO, - inNestedTuple3, - BasicTypeInfo.INT_TYPE_INFO ); - - private final PojoTypeInfo pojoTypeInfo = ((PojoTypeInfo) TypeExtractor.getForClass - (MyPojo.class)); - - private final TupleTypeInfo pojoInTupleTypeInfo = new TupleTypeInfo >(BasicTypeInfo.INT_TYPE_INFO, pojoTypeInfo); - - @Test - public void testGetFlatFields() { - assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition()); - assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition()); - assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition()); - assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition()); - assertEquals(0, tupleTypeInfo.getFlatFields("f0").get(0).getPosition()); - assertEquals(1, tupleTypeInfo.getFlatFields("f1").get(0).getPosition()); - assertEquals(2, tupleTypeInfo.getFlatFields("f2").get(0).getPosition()); - assertEquals(3, tupleTypeInfo.getFlatFields("f3").get(0).getPosition()); - - assertEquals(0, nestedTypeInfo.getFlatFields("0").get(0).getPosition()); - assertEquals(1, nestedTypeInfo.getFlatFields("1.0").get(0).getPosition()); - assertEquals(2, nestedTypeInfo.getFlatFields("1.1").get(0).getPosition()); - assertEquals(3, nestedTypeInfo.getFlatFields("1.2").get(0).getPosition()); - assertEquals(4, nestedTypeInfo.getFlatFields("2").get(0).getPosition()); -
[30/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java new file mode 100644 index 000..8116121 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple20.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -- + + +package org.apache.flink.api.java.tuple; + +import org.apache.flink.util.StringUtils; + +/** + * A tuple with 20 fields. Tuples are strongly typed; each field may be of a separate type. + * The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #getField(int)} method. The tuple field positions start at zero. + * + * Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work + * with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * @see Tuple + * + * @param The type of field 0 + * @param The type of field 1 + * @param The type of field 2 + * @param The type of field 3 + * @param The type of field 4 + * @param The type of field 5 + * @param The type of field 6 + * @param The type of field 7 + * @param The type of field 8 + * @param The type of field 9 + * @param The type of field 10 + * @param The type of field 11 + * @param The type of field 12 + * @param The type of field 13 + * @param The type of field 14 + * @param The type of field 15 + * @param The type of field 16 + * @param The type of field 17 + * @param The type of field 18 + * @param The type of field 19 + */ +public class Tuple20extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + /** Field 2 of the tuple. */ + public T2 f2; + /** Field 3 of the tuple. */ + public T3 f3; + /** Field 4 of the tuple. */ + public T4 f4; + /** Field 5 of the tuple. */ + public T5 f5; + /** Field 6 of the tuple. */ + public T6 f6; + /** Field 7 of the tuple. */ + public T7 f7; + /** Field 8 of the tuple. */ + public T8 f8; + /** Field 9 of the tuple. */ + public T9 f9; + /** Field 10 of the tuple. */ + public T10 f10; + /** Field 11 of the tuple. */ + public T11 f11; + /** Field 12 of the tuple. */ + public T12 f12; + /** Field 13 of the tuple. */ + public T13 f13; + /** Field 14 of the tuple. */ + public T14 f14; + /** Field 15 of the tuple. */ + public T15 f15; + /** Field 16 of the tuple. */ + public T16 f16; + /** Field 17 of the tuple. */ + public T17 f17; + /** Field 18 of the tuple. */ + public T18 f18; + /** Field 19 of the tuple. */ + public T19 f19; + + /** +* Creates a new tuple where all fields are null. +*/ + public Tuple20() {} + + /** +* Creates a new tuple and assigns the given values to the tuple's fields. +* +* @param value0 The value for field 0 +* @param value1 The value for field 1 +* @param value2 The value for field 2 +* @param value3 The value for field 3 +* @param value4 The value for field 4 +* @param value5 The value for field 5 +* @param value6 The value for field 6 +* @param value7 The value for field 7 +* @param value8 The value for field 8 +* @param value9 The value for field 9 +* @param value10 The value for field 10 +
[08/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java deleted file mode 100644 index de24956..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ /dev/null @@ -1,592 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - - -public final class PojoSerializer extends TypeSerializer { - - // Flags for the header - private static byte IS_NULL = 1; - private static byte NO_SUBCLASS = 2; - private static byte IS_SUBCLASS = 4; - private static byte IS_TAGGED_SUBCLASS = 8; - - private static final long serialVersionUID = 1L; - - private final Class clazz; - - private final TypeSerializer[] fieldSerializers; - - private final int numFields; - - private final MapregisteredClasses; - - private final TypeSerializer[] registeredSerializers; - - private final ExecutionConfig executionConfig; - - private transient Map subclassSerializerCache; - private transient ClassLoader cl; - // We need to handle these ourselves in writeObject()/readObject() - private transient Field[] fields; - - @SuppressWarnings("unchecked") - public PojoSerializer( - Class clazz, - TypeSerializer[] fieldSerializers, - Field[] fields, - ExecutionConfig executionConfig) { - - this.clazz = Preconditions.checkNotNull(clazz); - this.fieldSerializers = (TypeSerializer[]) Preconditions.checkNotNull(fieldSerializers); - this.fields = Preconditions.checkNotNull(fields); - this.numFields = fieldSerializers.length; - this.executionConfig = Preconditions.checkNotNull(executionConfig); - - LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); - - for (int i = 0; i < numFields; i++) { - this.fields[i].setAccessible(true); - } - - cl = Thread.currentThread().getContextClassLoader(); - - subclassSerializerCache = new HashMap (); - - // We only want those classes that are not our own class and are actually sub-classes. - List cleanedTaggedClasses = new ArrayList (registeredPojoTypes.size()); - for (Class registeredClass: registeredPojoTypes) { - if (registeredClass.equals(clazz)) { - continue; - } - if (!clazz.isAssignableFrom(registeredClass)) { - continue; - } - cleanedTaggedClasses.add(registeredClass); - - } - this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); - registeredSerializers = new
[23/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java deleted file mode 100644 index b423a3b..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -// -- -// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! -// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. -// -- - - -package org.apache.flink.api.java.tuple.builder; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple15; - -public class Tuple15Builder{ - - private List > tuples = new ArrayList >(); - - public Tuple15Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14){ - tuples.add(new Tuple15 (value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14)); - return this; - } - - @SuppressWarnings("unchecked") - public Tuple15 [] build(){ - return tuples.toArray(new Tuple15[tuples.size()]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java deleted file mode 100644 index c698730..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -// -- -// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! -// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. -// -- - - -package org.apache.flink.api.java.tuple.builder; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple16; - -public class Tuple16Builder { - - private List > tuples = new ArrayList
[04/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java deleted file mode 100644 index 19fac43..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { - @Override - protected TypeSerializer createSerializer(Class type) { - return new AvroSerializer(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java deleted file mode 100644 index df1ff60..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - @Override - protected TypeSerializer createSerializer(Class type) { - return new AvroSerializer(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java deleted file mode 100644 index 8a89410..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
[09/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java deleted file mode 100644 index 5187de7..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; -import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; -import org.apache.flink.api.java.typeutils.runtime.ValueComparator; -import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.ByteValue; -import org.apache.flink.types.CharValue; -import org.apache.flink.types.CopyableValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.ShortValue; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; - -/** - * Type information for data types that extend the {@link Value} interface. The value - * interface allows types to define their custom serialization and deserialization routines. - * - * @param The type of the class represented by this type information. - */ -public class ValueTypeInfo extends TypeInformation implements AtomicType { - - private static final long serialVersionUID = 1L; - - public static final ValueTypeInfo BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class); - public static final ValueTypeInfo BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class); - public static final ValueTypeInfo CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class); - public static final ValueTypeInfo DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class); - public static final ValueTypeInfo FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class); - public static final ValueTypeInfo INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class); - public static final ValueTypeInfo LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class); - public static final ValueTypeInfo NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class); - public static final ValueTypeInfo SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class); - public static final ValueTypeInfo STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class); - - private final Class type; - - public ValueTypeInfo(Class type) { - this.type = Preconditions.checkNotNull(type); - - Preconditions.checkArgument( - Value.class.isAssignableFrom(type) || type.equals(Value.class), - "ValueTypeInfo can only be used for subclasses of " + Value.class.getName()); - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class getTypeClass() { - return this.type; - } - - @Override - public boolean isBasicType() { - return false; - } - - public boolean isBasicValueType() { - return
[38/38] flink git commit: [FLINK-3314] [streaming] Fix case where early cancel messages do not properly cancel a stream operator.
[FLINK-3314] [streaming] Fix case where early cancel messages do not properly cancel a stream operator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fc7e7af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fc7e7af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fc7e7af Branch: refs/heads/master Commit: 8fc7e7af2c7decb8e531b76e3edcc2601f73fe9d Parents: c356ef3 Author: Stephan EwenAuthored: Tue Feb 2 12:44:42 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 17:11:50 2016 +0100 -- .../streaming/runtime/tasks/StreamTask.java | 20 +- .../streaming/runtime/tasks/StreamTaskTest.java | 254 +++ 2 files changed, 273 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8fc7e7af/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 9ab6c10..c9624fc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -146,6 +146,9 @@ public abstract class StreamTask /** Flag to mark the task "in operation", in which case check * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */ private volatile boolean isRunning; + + /** Flag to mark this task as canceled */ + private volatile boolean canceled; private long recoveryTimestamp; @@ -191,6 +194,11 @@ public abstract class StreamTask // task specific initialization init(); + // save the work of reloadig state, etc, if the task is already canceled + if (canceled) { + throw new CancelTaskException(); + } + // Invoke LOG.debug("Invoking {}", getName()); @@ -205,7 +213,12 @@ public abstract class StreamTask openAllOperators(); } - // let the task do its work + // final check to exit early before starting to run + if (canceled) { + throw new CancelTaskException(); + } + + // let the task do its work isRunning = true; run(); isRunning = false; @@ -290,6 +303,7 @@ public abstract class StreamTask @Override public final void cancel() throws Exception { isRunning = false; + canceled = true; cancelTask(); } @@ -297,6 +311,10 @@ public abstract class StreamTask return isRunning; } + public final boolean isCanceled() { + return canceled; + } + private void openAllOperators() throws Exception { for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/8fc7e7af/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java new file mode 100644 index 000..c18d150 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain
[03/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java deleted file mode 100644 index 8cdee9b..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorISD1Test extends TupleComparatorTestBase> { - - @SuppressWarnings("unchecked") - Tuple3 [] dataISD = new Tuple3[]{ - new Tuple3 (4, "hello", 20.0), - new Tuple3 (5, "hello", 23.2), - new Tuple3 (6, "world", 20.0), - new Tuple3 (7, "hello", 20.0), - new Tuple3 (8, "hello", 23.2), - new Tuple3 (9, "world", 20.0), - new Tuple3 (10, "hello", 20.0), - new Tuple3 (11, "hello", 23.2) - }; - - @Override - protected TupleComparator > createComparator(boolean ascending) { - return new TupleComparator >( - new int[]{0}, - new TypeComparator[]{ new IntComparator(ascending) }, - new TypeSerializer[]{ IntSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer > createSerializer() { - return new TupleSerializer >( - (Class >) (Class) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new StringSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3 [] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java deleted file mode 100644 index 06c292f..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.
[22/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
[FLINK-3303] [core] Move all type utilities to flink-core Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21a71586 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21a71586 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21a71586 Branch: refs/heads/master Commit: 21a715867d655bb61df9a9f7eef37e42b99e206a Parents: 7081836 Author: Stephan EwenAuthored: Sun Jan 31 23:28:32 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 16:55:44 2016 +0100 -- flink-core/pom.xml | 47 +- .../apache/flink/api/common/operators/Keys.java | 459 + .../flink/api/java/functions/KeySelector.java | 63 + .../flink/api/java/typeutils/AvroTypeInfo.java | 78 + .../api/java/typeutils/EitherTypeInfo.java | 122 ++ .../flink/api/java/typeutils/EnumTypeInfo.java | 122 ++ .../api/java/typeutils/GenericTypeInfo.java | 116 ++ .../java/typeutils/InputTypeConfigurable.java | 42 + .../api/java/typeutils/MissingTypeInfo.java | 121 ++ .../api/java/typeutils/ObjectArrayTypeInfo.java | 141 ++ .../flink/api/java/typeutils/PojoField.java | 108 + .../flink/api/java/typeutils/PojoTypeInfo.java | 406 .../api/java/typeutils/ResultTypeQueryable.java | 37 + .../flink/api/java/typeutils/TupleTypeInfo.java | 248 +++ .../api/java/typeutils/TupleTypeInfoBase.java | 252 +++ .../flink/api/java/typeutils/TypeExtractor.java | 1692 +++ .../api/java/typeutils/TypeInfoParser.java | 383 .../flink/api/java/typeutils/ValueTypeInfo.java | 183 ++ .../api/java/typeutils/WritableTypeInfo.java| 139 ++ .../java/typeutils/runtime/AvroSerializer.java | 201 ++ .../runtime/CopyableValueComparator.java| 167 ++ .../runtime/CopyableValueSerializer.java| 129 ++ .../typeutils/runtime/DataInputDecoder.java | 229 +++ .../typeutils/runtime/DataInputViewStream.java | 71 + .../typeutils/runtime/DataOutputEncoder.java| 190 ++ .../typeutils/runtime/DataOutputViewStream.java | 41 + .../typeutils/runtime/EitherSerializer.java | 193 ++ .../runtime/GenericTypeComparator.java | 177 ++ .../api/java/typeutils/runtime/KryoUtils.java | 87 + .../java/typeutils/runtime/NoFetchingInput.java | 141 ++ .../java/typeutils/runtime/PojoComparator.java | 354 .../java/typeutils/runtime/PojoSerializer.java | 592 ++ .../runtime/RuntimeComparatorFactory.java | 75 + .../runtime/RuntimePairComparatorFactory.java | 44 + .../runtime/RuntimeSerializerFactory.java | 124 ++ .../typeutils/runtime/Tuple0Serializer.java | 121 ++ .../java/typeutils/runtime/TupleComparator.java | 157 ++ .../typeutils/runtime/TupleComparatorBase.java | 279 +++ .../java/typeutils/runtime/TupleSerializer.java | 158 ++ .../typeutils/runtime/TupleSerializerBase.java | 102 + .../java/typeutils/runtime/ValueComparator.java | 183 ++ .../java/typeutils/runtime/ValueSerializer.java | 152 ++ .../typeutils/runtime/WritableComparator.java | 189 ++ .../typeutils/runtime/WritableSerializer.java | 153 ++ .../typeutils/runtime/kryo/KryoSerializer.java | 382 .../typeutils/runtime/kryo/Serializers.java | 227 ++ .../common/operators/ExpressionKeysTest.java| 481 + .../operators/SelectorFunctionKeysTest.java | 154 ++ .../apache/flink/api/java/tuple/Tuple2Test.java | 44 + .../api/java/typeutils/CompositeTypeTest.java | 179 ++ .../api/java/typeutils/EitherTypeInfoTest.java | 61 + .../api/java/typeutils/EnumTypeInfoTest.java| 51 + .../api/java/typeutils/GenericTypeInfoTest.java | 47 + .../api/java/typeutils/MissingTypeInfoTest.java | 47 + .../java/typeutils/ObjectArrayTypeInfoTest.java | 58 + .../java/typeutils/PojoTypeExtractionTest.java | 812 .../api/java/typeutils/PojoTypeInfoTest.java| 153 ++ .../java/typeutils/PojoTypeInformationTest.java | 98 + .../api/java/typeutils/TupleTypeInfoTest.java | 96 + .../TypeExtractorInputFormatsTest.java | 231 +++ .../api/java/typeutils/TypeExtractorTest.java | 1907 + .../api/java/typeutils/TypeInfoParserTest.java | 338 +++ .../api/java/typeutils/ValueTypeInfoTest.java | 87 + .../java/typeutils/WritableTypeInfoTest.java| 74 + .../AbstractGenericArraySerializerTest.java | 187 ++ .../AbstractGenericTypeComparatorTest.java | 376 .../AbstractGenericTypeSerializerTest.java | 364 .../runtime/AvroGenericArraySerializerTest.java | 28 + .../runtime/AvroGenericTypeComparatorTest.java | 28 + .../runtime/AvroGenericTypeSerializerTest.java | 29 + .../runtime/AvroSerializerEmptyArrayTest.java | 189 ++ .../runtime/CopyableValueComparatorTest.java| 53 + .../typeutils/runtime/EitherSerializerTest.java | 113 +
[29/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java new file mode 100644 index 000..901b838 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple25.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -- + + +package org.apache.flink.api.java.tuple; + +import org.apache.flink.util.StringUtils; + +/** + * A tuple with 25 fields. Tuples are strongly typed; each field may be of a separate type. + * The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #getField(int)} method. The tuple field positions start at zero. + * + * Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work + * with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * @see Tuple + * + * @param The type of field 0 + * @param The type of field 1 + * @param The type of field 2 + * @param The type of field 3 + * @param The type of field 4 + * @param The type of field 5 + * @param The type of field 6 + * @param The type of field 7 + * @param The type of field 8 + * @param The type of field 9 + * @param The type of field 10 + * @param The type of field 11 + * @param The type of field 12 + * @param The type of field 13 + * @param The type of field 14 + * @param The type of field 15 + * @param The type of field 16 + * @param The type of field 17 + * @param The type of field 18 + * @param The type of field 19 + * @param The type of field 20 + * @param The type of field 21 + * @param The type of field 22 + * @param The type of field 23 + * @param The type of field 24 + */ +public class Tuple25extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + /** Field 2 of the tuple. */ + public T2 f2; + /** Field 3 of the tuple. */ + public T3 f3; + /** Field 4 of the tuple. */ + public T4 f4; + /** Field 5 of the tuple. */ + public T5 f5; + /** Field 6 of the tuple. */ + public T6 f6; + /** Field 7 of the tuple. */ + public T7 f7; + /** Field 8 of the tuple. */ + public T8 f8; + /** Field 9 of the tuple. */ + public T9 f9; + /** Field 10 of the tuple. */ + public T10 f10; + /** Field 11 of the tuple. */ + public T11 f11; + /** Field 12 of the tuple. */ + public T12 f12; + /** Field 13 of the tuple. */ + public T13 f13; + /** Field 14 of the tuple. */ + public T14 f14; + /** Field 15 of the tuple. */ + public T15 f15; + /** Field 16 of the tuple. */ + public T16 f16; + /** Field 17 of the tuple. */ + public T17 f17; + /** Field 18 of the tuple. */ + public T18 f18; + /** Field 19 of the tuple. */ + public T19 f19; + /** Field 20 of the tuple. */ + public T20 f20; + /** Field 21 of the tuple. */ + public T21 f21; + /** Field 22 of the tuple. */ + public T22 f22; + /** Field 23 of the tuple. */ + public T23 f23; + /** Field 24 of the tuple. */ + public T24 f24; + + /** +* Creates a new tuple where all fields are null. +*/ + public Tuple25() {} + + /** +* Creates a new tuple and assigns the given values to the tuple's fields. +* +* @param
[33/38] flink git commit: [FLINK-3049] [api breaking] Move 'Either' type to 'flink-core / org.apache.flink.types'
[FLINK-3049] [api breaking] Move 'Either' type to 'flink-core / org.apache.flink.types' Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54743866 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54743866 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54743866 Branch: refs/heads/master Commit: 54743866e86cbe7689ae1dcf001deb559629747b Parents: 360f02b Author: Stephan EwenAuthored: Sun Jan 31 23:27:36 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 16:55:44 2016 +0100 -- .../java/org/apache/flink/types/Either.java | 185 +++ 1 file changed, 185 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/54743866/flink-core/src/main/java/org/apache/flink/types/Either.java -- diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java new file mode 100644 index 000..361802b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/Either.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types; + +/** + * This type represents a value of one two possible types, Left or Right (a + * disjoint union), inspired by Scala's Either type. + * + * @param + *the type of Left + * @param + *the type of Right + */ +public abstract class Either { + + /** +* Create a Left value of Either +*/ + public static Either Left(L value) { + return new Left (value); + } + + /** +* Create a Right value of Either +*/ + public static Either Right(R value) { + return new Right (value); + } + + /** +* Retrieve the Left value of Either. +* +* @return the Left value +* @throws IllegalStateException +* if called on a Right +*/ + public abstract L left() throws IllegalStateException; + + /** +* Retrieve the Right value of Either. +* +* @return the Right value +* @throws IllegalStateException +* if called on a Left +*/ + public abstract R right() throws IllegalStateException; + + /** +* +* @return true if this is a Left value, false if this is a Right value +*/ + public final boolean isLeft() { + return getClass() == Left.class; + } + + /** +* +* @return true if this is a Right value, false if this is a Left value +*/ + public final boolean isRight() { + return getClass() == Right.class; + } + + /** +* A left value of {@link Either} +* +* @param +*the type of Left +* @param +*the type of Right +*/ + public static class Left extends Either { + private final L value; + + public Left(L value) { + this.value = java.util.Objects.requireNonNull(value); + } + + @Override + public L left() { + return value; + } + + @Override + public R right() { + throw new IllegalStateException("Cannot retrieve Right value on a Left"); + } + + @Override + public boolean equals(Object object) { + if (object instanceof Left) { + final Left other = (Left) object; + return value.equals(other.value); + } + return false; + } + + @Override + public int hashCode() { + return value.hashCode(); +
[06/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java deleted file mode 100644 index 7f5ef25..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.type.extractor; - -import static org.junit.Assert.assertTrue; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.junit.Test; - -public class PojoTypeInformationTest { - - public static class SimplePojo { - public String str; - public Boolean Bl; - public boolean bl; - public Byte Bt; - public byte bt; - public Short Shrt; - public short shrt; - public Integer Intgr; - public int intgr; - public Long Lng; - public long lng; - public Float Flt; - public float flt; - public Double Dbl; - public double dbl; - public Character Ch; - public char ch; - public int[] primIntArray; - public Integer[] intWrapperArray; - } - - @Test - public void testSimplePojoTypeExtraction() { - TypeInformation type = TypeExtractor.getForClass(SimplePojo.class); - assertTrue("Extracted type is not a composite/pojo type but should be.", type instanceof CompositeType); - } - - public static class NestedPojoInner { - public String field; - } - - public static class NestedPojoOuter { - public Integer intField; - public NestedPojoInner inner; - } - - @Test - public void testNestedPojoTypeExtraction() { - TypeInformation type = TypeExtractor.getForClass(NestedPojoOuter.class); - assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); - } - - public static class Recursive1Pojo { - public Integer intField; - public Recursive2Pojo rec; - } - - public static class Recursive2Pojo { - public String strField; - public Recursive1Pojo rec; - } - - @Test - public void testRecursivePojoTypeExtraction() { - // This one tests whether a recursive pojo is detected using the set of visited - // types in the type extractor. The recursive field will be handled using the generic serializer. - TypeInformation type = TypeExtractor.getForClass(Recursive1Pojo.class); - assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); - } - - @Test - public void testRecursivePojoObjectTypeExtraction() { - TypeInformation type = TypeExtractor.getForObject(new Recursive1Pojo()); - assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java deleted file mode 100644 index bbf5148..000 ---
[26/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java deleted file mode 100644 index 70da5bb..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple17.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -// -- -// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! -// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. -// -- - - -package org.apache.flink.api.java.tuple; - -import org.apache.flink.util.StringUtils; - -/** - * A tuple with 17 fields. Tuples are strongly typed; each field may be of a separate type. - * The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position - * through the {@link #getField(int)} method. The tuple field positions start at zero. - * - * Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work - * with Tuples to reuse objects in order to reduce pressure on the garbage collector. - * - * @see Tuple - * - * @param The type of field 0 - * @param The type of field 1 - * @param The type of field 2 - * @param The type of field 3 - * @param The type of field 4 - * @param The type of field 5 - * @param The type of field 6 - * @param The type of field 7 - * @param The type of field 8 - * @param The type of field 9 - * @param The type of field 10 - * @param The type of field 11 - * @param The type of field 12 - * @param The type of field 13 - * @param The type of field 14 - * @param The type of field 15 - * @param The type of field 16 - */ -public class Tuple17extends Tuple { - - private static final long serialVersionUID = 1L; - - /** Field 0 of the tuple. */ - public T0 f0; - /** Field 1 of the tuple. */ - public T1 f1; - /** Field 2 of the tuple. */ - public T2 f2; - /** Field 3 of the tuple. */ - public T3 f3; - /** Field 4 of the tuple. */ - public T4 f4; - /** Field 5 of the tuple. */ - public T5 f5; - /** Field 6 of the tuple. */ - public T6 f6; - /** Field 7 of the tuple. */ - public T7 f7; - /** Field 8 of the tuple. */ - public T8 f8; - /** Field 9 of the tuple. */ - public T9 f9; - /** Field 10 of the tuple. */ - public T10 f10; - /** Field 11 of the tuple. */ - public T11 f11; - /** Field 12 of the tuple. */ - public T12 f12; - /** Field 13 of the tuple. */ - public T13 f13; - /** Field 14 of the tuple. */ - public T14 f14; - /** Field 15 of the tuple. */ - public T15 f15; - /** Field 16 of the tuple. */ - public T16 f16; - - /** -* Creates a new tuple where all fields are null. -*/ - public Tuple17() {} - - /** -* Creates a new tuple and assigns the given values to the tuple's fields. -* -* @param value0 The value for field 0 -* @param value1 The value for field 1 -* @param value2 The value for field 2 -* @param value3 The value for field 3 -* @param value4 The value for field 4 -* @param value5 The value for field 5 -* @param value6 The value for field 6 -* @param value7 The value for field 7 -* @param value8 The value for field 8 -* @param value9 The value for field 9 -* @param value10 The value for field 10 -* @param value11 The value for field 11 -* @param value12 The value for field 12 -* @param value13 The value for field 13 -* @param value14 The value for field 14 -* @param value15 The value for field 15 -* @param value16 The value for field 16 -
[36/38] flink git commit: [hotfix] Remove old sysout debug message in UdfAnalyzerTest
[hotfix] Remove old sysout debug message in UdfAnalyzerTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90542e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90542e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90542e8 Branch: refs/heads/master Commit: b90542e80b2f9f5fb8ffe6e6325400b3cdd9301f Parents: 168532e Author: Stephan EwenAuthored: Mon Feb 1 21:46:09 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 17:11:50 2016 +0100 -- .../test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java| 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b90542e8/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java index a1ccfe4..dc2d1db 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java @@ -366,7 +366,6 @@ public class UdfAnalyzerTest { @Test public void testForwardWithGetMethod() { - System.out.println("HERE"); compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class, "Tuple4 ", "Tuple4 "); }
[37/38] flink git commit: [hotfix] Fix warnings concerning joda-time in "flink-tests"
[hotfix] Fix warnings concerning joda-time in "flink-tests" Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c356ef33 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c356ef33 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c356ef33 Branch: refs/heads/master Commit: c356ef33bc0fdd77296061679699487ab16e8132 Parents: b90542e Author: Stephan EwenAuthored: Tue Feb 2 12:10:01 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 17:11:50 2016 +0100 -- flink-core/pom.xml | 2 -- flink-tests/pom.xml | 6 ++ 2 files changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c356ef33/flink-core/pom.xml -- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index ba1050c..694f629 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -94,14 +94,12 @@ under the License. joda-time joda-time - 2.5 test org.joda joda-convert - 1.7 test http://git-wip-us.apache.org/repos/asf/flink/blob/c356ef33/flink-tests/pom.xml -- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index fe34aea..996f5e0 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -164,6 +164,12 @@ under the License. + org.joda + joda-convert + test + + + com.google.guava guava ${guava.version}
[20/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java new file mode 100644 index 000..b89a830 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -0,0 +1,383 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.types.Value; + + +public class TypeInfoParser { + private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple"; + private static final String VALUE_PACKAGE = "org.apache.flink.types"; + private static final String WRITABLE_PACKAGE = "org.apache.hadoop.io"; + + private static final Pattern tuplePattern = Pattern.compile("^(" + TUPLE_PACKAGE.replaceAll("\\.", ".") + "\\.)?((Tuple[1-9][0-9]?)<|(Tuple0))"); + private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", ".") + "\\.)?Writable)<([^\\s,>]*)(,|>|$|\\[)"); + private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$|\\[)"); + private static final Pattern basicTypePattern = Pattern + .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)"); + private static final Pattern basicTypeDatePattern = Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)"); + private static final Pattern primitiveTypePattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)"); + private static final Pattern valueTypePattern = Pattern.compile("^((" + VALUE_PACKAGE.replaceAll("\\.", ".") + + "\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)"); + private static final Pattern pojoGenericObjectPattern = Pattern.compile("^([^\\s,<>\\[]+)(<)?"); + private static final Pattern fieldPattern = Pattern.compile("^([^\\s,<>\\[]+)="); + + /** +* Generates an instance of TypeInformation by parsing a type +* information string. A type information string can contain the following +* types: +* +* +* Basic types such as Integer, String, etc. +* Basic type arrays such as Integer[], +* String[], etc. +* Tuple types such as Tuple1TYPE0, +* Tuple2TYPE0, TYPE1, etc. +* Pojo types such as org.my.MyPojomyFieldName=TYPE0,myFieldName2=TYPE1, etc. +* Generic types such as java.lang.Class, etc. +* Custom type arrays such as org.my.CustomClass[], +* org.my.CustomClass$StaticInnerClass[], etc. +* Value types such as DoubleValue, +* StringValue, IntegerValue, etc. +* Tuple array types such as Tuple2TYPE0,TYPE1[], etc. +* Writable types such as Writableorg.my.CustomWritable +* Enum types such as Enumorg.my.CustomEnum +* +* +* Example: +* "Tuple2String,Tuple2Integer,org.my.MyJob$Pojoword=String" +* +* @param infoString +*type information string to be parsed +* @return TypeInformation representation of the string +*/ + @SuppressWarnings("unchecked") + public static TypeInformation parse(String infoString) { + try { + if (infoString == null) { + throw new
[18/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java new file mode 100644 index 000..258d92c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +import com.esotericsoftware.kryo.Kryo; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; + +public class WritableSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + private final Class typeClass; + + private transient Kryo kryo; + + private transient T copyInstance; + + public WritableSerializer(Class typeClass) { + this.typeClass = typeClass; + } + + @SuppressWarnings("unchecked") + @Override + public T createInstance() { + if(typeClass == NullWritable.class) { + return (T) NullWritable.get(); + } + return InstantiationUtil.instantiate(typeClass); + } + + + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return deserialize(createInstance(), source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + reuse.readFields(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + ensureInstanceInstantiated(); + copyInstance.readFields(source); + copyInstance.write(target); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public WritableSerializer duplicate() { + return new WritableSerializer(typeClass); + } + + // + + private void ensureInstanceInstantiated() { + if (copyInstance == null) { + copyInstance = createInstance(); + } + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(typeClass); + } + } +
[10/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java deleted file mode 100644 index b8f2075..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ /dev/null @@ -1,1687 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.commons.lang3.ClassUtils; -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.MapPartitionFunction; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; -import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -/** - * A utility for reflection analysis on classes, to determine the return type of implementations of transformation - * functions. - */ -public class TypeExtractor { - - /* -* NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy". -* The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate -* types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type -* (depends on the method, e.g. MyPojoFieldType). -* -* Thus, it fully qualifies types until tuple/POJO field level. -* -* A typical typeHierarchy could look like: -* -* UDF: MyMapFunction.class -* top-level UDF: MyMapFunctionBase.class -* RichMapFunction: RichMapFunction.class -* MapFunction: MapFunction.class -* Function's OUT: Tuple1 -* user-defined POJO: MyPojo.class -* user-defined top-level POJO: MyPojoBase.class -* POJO field: Tuple1 -* Field type: String.class -* -*/ - - private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); - - protected TypeExtractor() { - // only create instances for special use cases - } - - // - // Function specific methods - //
[14/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java new file mode 100644 index 000..8c61a19 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Test; + +import java.util.Random; + +/** + * A test for the {@link PojoSerializer}. + */ +public class PojoSubclassSerializerTest extends SerializerTestBase { + private TypeInformation type = TypeExtractor.getForClass(TestUserClassBase.class); + + @Override + protected TypeSerializer createSerializer() { + // only register one of the three child classes, the third child class is NO POJO + ExecutionConfig conf = new ExecutionConfig(); + conf.registerPojoType(TestUserClass1.class); + TypeSerializer serializer = type.createSerializer(conf); + assert(serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return TestUserClassBase.class; + } + + @Override + protected TestUserClassBase[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClassBase[]{ + new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), + new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()), + new TestUserClass3(rnd.nextInt(), "bar", rnd.nextFloat()) + }; + + } + + @Override + @Test + public void testInstantiate() { + // don't do anything, since the PojoSerializer with subclass will return null + } + + // User code class for testing the serializer + public static abstract class TestUserClassBase { + public int dumm1; + public String dumm2; + + + public TestUserClassBase() { + } + + public TestUserClassBase(int dumm1, String dumm2) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + } + + @Override + public int hashCode() { + return Objects.hashCode(dumm1, dumm2); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClassBase)) { + return false; + } + TestUserClassBase otherTUC = (TestUserClassBase) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + return true; + } + } + + public static class TestUserClass1 extends TestUserClassBase { + public long dumm3; + + public TestUserClass1() { + } + + public TestUserClass1(int dumm1, String dumm2, long dumm3) { + super(dumm1, dumm2); +
[34/38] flink git commit: [FLINK-2348] Fix unstable IterateExampleITCase.
[FLINK-2348] Fix unstable IterateExampleITCase. This deactivates the validation of results, which is not reliably possible under the current model (timeout on feedback). This test for now only checks that the job executes properly. Also adds proper logging property files for the examples projects. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfff86c8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfff86c8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfff86c8 Branch: refs/heads/master Commit: bfff86c841f0b873f25367419db0b3dd504a1197 Parents: 6f042e7 Author: Stephan EwenAuthored: Mon Feb 1 17:09:38 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 16:55:45 2016 +0100 -- .../src/main/resources/log4j-test.properties| 23 .../src/main/resources/log4j.properties | 23 .../src/main/resources/logback.xml | 29 .../iteration/IterateExampleITCase.java | 4 ++- .../src/test/resources/log4j-test.properties| 23 5 files changed, 78 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bfff86c8/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties -- diff --git a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties deleted file mode 100644 index 65bd0b8..000 --- a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties +++ /dev/null @@ -1,23 +0,0 @@ - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bfff86c8/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties -- diff --git a/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties b/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties new file mode 100644 index 000..da32ea0 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
[02/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java deleted file mode 100644 index 8ff0b1b..000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; -import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Random; - -import static org.junit.Assert.*; - -@SuppressWarnings("unchecked") -public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - ExecutionConfig ec = new ExecutionConfig(); - - @Test - public void testJavaList(){ - Collection a = new ArrayList<>(); - - fillCollection(a); - - runTests(a); - } - - @Test - public void testJavaSet(){ - Collection b = new HashSet<>(); - - fillCollection(b); - - runTests(b); - } - - - - @Test - public void testJavaDequeue(){ - Collection c = new LinkedList<>(); - fillCollection(c); - runTests(c); - } - - private void fillCollection(Collection coll) { - coll.add(42); - coll.add(1337); - coll.add(49); - coll.add(1); - } - - @Override - protected TypeSerializer createSerializer(Class type) { - return new KryoSerializer(type, ec); - } - - /** -* Make sure that the kryo serializer forwards EOF exceptions properly when serializing -*/ - @Test - public void testForwardEOFExceptionWhileSerializing() { - try { - // construct a long string - String str; - { - char[] charData = new char[4]; - Random rnd = new Random(); - - for (int i = 0; i < charData.length; i++) { - charData[i] = (char) rnd.nextInt(1); - } - - str = new String(charData); - } - - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(1, 3); - KryoSerializer serializer = new KryoSerializer(String.class, new ExecutionConfig()); - - try { - serializer.serialize(str, target); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { -
[17/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java new file mode 100644 index 000..bc11848 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java @@ -0,0 +1,812 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.HashMultiset; + +/** + * Pojo Type tests + * + * A Pojo is a bean-style class with getters, setters and empty ctor + * OR a class with all fields public (or for every private field, there has to be a public getter/setter) + * everything else is a generic type (that can't be used for field selection) + */ +public class PojoTypeExtractionTest { + + public static class HasDuplicateField extends WC { + @SuppressWarnings("unused") + private int count; // duplicate + } + + @Test(expected=RuntimeException.class) + public void testDuplicateFieldException() { + TypeExtractor.createTypeInfo(HasDuplicateField.class); + } + + // test with correct pojo types + public static class WC { // is a pojo + public ComplexNestedClass complex; // is a pojo + private int count; // is a BasicType + + public WC() { + } + public int getCount() { + return count; + } + public void setCount(int c) { + this.count = c; + } + } + public static class ComplexNestedClass { // pojo + public static int ignoreStaticField; + public transient int ignoreTransientField; + public Date date; // generic type + public Integer someNumberWithÃnicödeNäme; // BasicType + public float someFloat; // BasicType + public Tuple3word; //Tuple Type with three basic types + public Object nothing; // generic type + public MyWritable hadoopCitizen; // writableType + public List collection; + } + + // all public test + public static class AllPublic extends ComplexNestedClass { + public ArrayList somethingFancy; // generic type + public HashMultiset fancyIds; // generic type + public String[] fancyArray; // generic type + } + + public static class ParentSettingGenerics extends PojoWithGenerics { + public String field3; + } + public static class PojoWithGenerics { + public int key; + public T1 field1; + public T2 field2; + } + + public static class ComplexHierarchyTop extends ComplexHierarchy {} + public static class ComplexHierarchy extends PojoWithGenerics {} + + // extends from Tuple and adds a field + public static class FromTuple extends Tuple3 { + private static final long serialVersionUID = 1L; +
[25/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java deleted file mode 100644 index 5f7194b..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple22.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -// -- -// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! -// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. -// -- - - -package org.apache.flink.api.java.tuple; - -import org.apache.flink.util.StringUtils; - -/** - * A tuple with 22 fields. Tuples are strongly typed; each field may be of a separate type. - * The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position - * through the {@link #getField(int)} method. The tuple field positions start at zero. - * - * Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work - * with Tuples to reuse objects in order to reduce pressure on the garbage collector. - * - * @see Tuple - * - * @param The type of field 0 - * @param The type of field 1 - * @param The type of field 2 - * @param The type of field 3 - * @param The type of field 4 - * @param The type of field 5 - * @param The type of field 6 - * @param The type of field 7 - * @param The type of field 8 - * @param The type of field 9 - * @param The type of field 10 - * @param The type of field 11 - * @param The type of field 12 - * @param The type of field 13 - * @param The type of field 14 - * @param The type of field 15 - * @param The type of field 16 - * @param The type of field 17 - * @param The type of field 18 - * @param The type of field 19 - * @param The type of field 20 - * @param The type of field 21 - */ -public class Tuple22extends Tuple { - - private static final long serialVersionUID = 1L; - - /** Field 0 of the tuple. */ - public T0 f0; - /** Field 1 of the tuple. */ - public T1 f1; - /** Field 2 of the tuple. */ - public T2 f2; - /** Field 3 of the tuple. */ - public T3 f3; - /** Field 4 of the tuple. */ - public T4 f4; - /** Field 5 of the tuple. */ - public T5 f5; - /** Field 6 of the tuple. */ - public T6 f6; - /** Field 7 of the tuple. */ - public T7 f7; - /** Field 8 of the tuple. */ - public T8 f8; - /** Field 9 of the tuple. */ - public T9 f9; - /** Field 10 of the tuple. */ - public T10 f10; - /** Field 11 of the tuple. */ - public T11 f11; - /** Field 12 of the tuple. */ - public T12 f12; - /** Field 13 of the tuple. */ - public T13 f13; - /** Field 14 of the tuple. */ - public T14 f14; - /** Field 15 of the tuple. */ - public T15 f15; - /** Field 16 of the tuple. */ - public T16 f16; - /** Field 17 of the tuple. */ - public T17 f17; - /** Field 18 of the tuple. */ - public T18 f18; - /** Field 19 of the tuple. */ - public T19 f19; - /** Field 20 of the tuple. */ - public T20 f20; - /** Field 21 of the tuple. */ - public T21 f21; - - /** -* Creates a new tuple where all fields are null. -*/ - public Tuple22() {} - - /** -* Creates a new tuple and assigns the given values to the tuple's fields. -* -* @param value0 The value for field 0 -* @param value1 The value for field 1 -* @param value2 The value for field 2 -* @param value3 The value for field 3 -* @param value4 The value for field 4 -* @param value5 The value for field 5 -* @param value6 The value
[31/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java new file mode 100644 index 000..0d45352 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple15.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -- + + +package org.apache.flink.api.java.tuple; + +import org.apache.flink.util.StringUtils; + +/** + * A tuple with 15 fields. Tuples are strongly typed; each field may be of a separate type. + * The fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #getField(int)} method. The tuple field positions start at zero. + * + * Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions that work + * with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * @see Tuple + * + * @param The type of field 0 + * @param The type of field 1 + * @param The type of field 2 + * @param The type of field 3 + * @param The type of field 4 + * @param The type of field 5 + * @param The type of field 6 + * @param The type of field 7 + * @param The type of field 8 + * @param The type of field 9 + * @param The type of field 10 + * @param The type of field 11 + * @param The type of field 12 + * @param The type of field 13 + * @param The type of field 14 + */ +public class Tuple15extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + /** Field 2 of the tuple. */ + public T2 f2; + /** Field 3 of the tuple. */ + public T3 f3; + /** Field 4 of the tuple. */ + public T4 f4; + /** Field 5 of the tuple. */ + public T5 f5; + /** Field 6 of the tuple. */ + public T6 f6; + /** Field 7 of the tuple. */ + public T7 f7; + /** Field 8 of the tuple. */ + public T8 f8; + /** Field 9 of the tuple. */ + public T9 f9; + /** Field 10 of the tuple. */ + public T10 f10; + /** Field 11 of the tuple. */ + public T11 f11; + /** Field 12 of the tuple. */ + public T12 f12; + /** Field 13 of the tuple. */ + public T13 f13; + /** Field 14 of the tuple. */ + public T14 f14; + + /** +* Creates a new tuple where all fields are null. +*/ + public Tuple15() {} + + /** +* Creates a new tuple and assigns the given values to the tuple's fields. +* +* @param value0 The value for field 0 +* @param value1 The value for field 1 +* @param value2 The value for field 2 +* @param value3 The value for field 3 +* @param value4 The value for field 4 +* @param value5 The value for field 5 +* @param value6 The value for field 6 +* @param value7 The value for field 7 +* @param value8 The value for field 8 +* @param value9 The value for field 9 +* @param value10 The value for field 10 +* @param value11 The value for field 11 +* @param value12 The value for field 12 +* @param value13 The value for field 13 +* @param value14 The value for field 14 +*/ + public Tuple15(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14) { + this.f0 = value0; + this.f1 = value1; + this.f2
[15/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java new file mode 100644 index 000..4a579c8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.*; + +public class ValueTypeInfoTest extends TestLogger { + + public static class TestClass implements Value { + private static final long serialVersionUID = -492760806806568285L; + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } + } + + public static class AlternativeClass implements Value { + + private static final long serialVersionUID = -163437084575260172L; + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } + } + + @Test + public void testValueTypeInfoEquality() { + ValueTypeInfo tpeInfo1 = new ValueTypeInfo<>(TestClass.class); + ValueTypeInfo tpeInfo2 = new ValueTypeInfo<>(TestClass.class); + + assertEquals(tpeInfo1, tpeInfo2); + assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode()); + } + + @Test + public void testValueTyepInfoInequality() { + ValueTypeInfo tpeInfo1 = new ValueTypeInfo<>(TestClass.class); + ValueTypeInfo tpeInfo2 = new ValueTypeInfo<>(AlternativeClass.class); + + assertNotEquals(tpeInfo1, tpeInfo2); + } + + @Test + public void testValueTypeEqualsWithNull() throws Exception { + ValueTypeInfo tpeInfo = new ValueTypeInfo<>(Record.class); + + Assert.assertFalse(tpeInfo.equals(null)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java new file mode 100644 index 000..2ab0021 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import
[11/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java deleted file mode 100644 index f7e4e42..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; -import org.apache.flink.api.java.typeutils.runtime.PojoComparator; -import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; - -import com.google.common.base.Joiner; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; - -/** - * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs, - * since the conditions are slightly different from Java Beans. - * A type is considered a FLink POJO type, if it fulfills the conditions below. - * - * It is a public class, and standalone (not a non-static inner class) - * It has a public no-argument constructor. - * All fields are either public, or have public getters and setters. - * - * - * @param The type represented by this type information. - */ -public class PojoTypeInfo extends CompositeType { - - private static final long serialVersionUID = 1L; - - private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; - private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; - private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; - - private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); - private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); - - private final PojoField[] fields; - - private final int totalFields; - - public PojoTypeInfo(Class typeClass, List fields) { - super(typeClass); - - Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()), - "POJO " + typeClass + " is not public"); - - this.fields = fields.toArray(new PojoField[fields.size()]); - - Arrays.sort(this.fields, new Comparator() { - @Override - public int compare(PojoField o1, PojoField o2) { - return o1.getField().getName().compareTo(o2.getField().getName()); - } - }); - - int counterFields = 0; - - for(PojoField field : fields) { - counterFields += field.getTypeInformation().getTotalFields(); - } - - totalFields = counterFields; - } - - @Override - public boolean isBasicType() { - return false; - } - - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return fields.length; - } - - @Override - public int
[12/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 0c0b710..c02d365 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase; @@ -42,9 +43,9 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase; import org.apache.flink.api.java.operators.join.JoinType; import org.apache.flink.api.java.operators.join.JoinFunctionAssigner; @@ -394,8 +395,8 @@ public abstract class JoinOperatorextends TwoInputUdfOperator keys1 = (SelectorFunctionKeys )rawKeys1; - TypeInformation > typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1); - Operator > keyMapper1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1); + TypeInformation > typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1); + Operator > keyMapper1 = KeyFunctions.appendKeyExtractor(input1, keys1); return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1); } @@ -406,8 +407,8 @@ public abstract class JoinOperator extends TwoInputUdfOperator keys2 = (SelectorFunctionKeys )rawKeys2; - TypeInformation > typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2); - Operator > keyMapper2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2); + TypeInformation > typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2); + Operator > keyMapper2 = KeyFunctions.appendKeyExtractor(input2, keys2); return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2); } http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java new file mode 100644 index 000..49d598a --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is
[35/38] flink git commit: [hotfix] [streaming] Various cleanups in StreamTask - Clean up generics - Clean and safe disposal of initialized resources - Add names to asynchronous materialization threads
[hotfix] [streaming] Various cleanups in StreamTask - Clean up generics - Clean and safe disposal of initialized resources - Add names to asynchronous materialization threads - Fix concurrent modification of materialization threads set Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168532ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168532ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168532ec Branch: refs/heads/master Commit: 168532ece09b4be972f3ccad509a1e3376a1ac3a Parents: bfff86c Author: Stephan EwenAuthored: Thu Jan 28 18:58:11 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 17:11:46 2016 +0100 -- .../streaming/runtime/tasks/StreamTask.java | 129 +-- .../runtime/state/StateBackendITCase.java | 1 - 2 files changed, 63 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e4b6b6e..9ab6c10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.runtime.tasks; +import java.io.Serializable; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -28,13 +30,11 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.AsynchronousStateHandle; -import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -140,14 +140,14 @@ public abstract class StreamTask * actual execution Thread. */ private volatile AsynchronousException asyncException; - protected Set asyncCheckpointThreads; + /** The currently active background materialization threads */ + private final Set asyncCheckpointThreads = Collections.synchronizedSet(new HashSet()); /** Flag to mark the task "in operation", in which case check * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */ private volatile boolean isRunning; private long recoveryTimestamp; - // // Life cycle methods for specific implementations @@ -167,21 +167,19 @@ public abstract class StreamTask @Override public final void invoke() throws Exception { - // - // Initialize - // - LOG.debug("Initializing {}", getName()); - boolean initializationCompleted = false; + boolean disposed = false; try { - AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); + // Initialize - + LOG.debug("Initializing {}", getName()); userClassLoader = getUserCodeClassLoader(); configuration = new StreamConfig(getTaskConfiguration()); - accumulatorMap = accumulatorRegistry.getUserMap(); + accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); headOperator = configuration.getStreamOperator(userClassLoader); - operatorChain = new OperatorChain<>(this, headOperator,
[21/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java new file mode 100644 index 000..30710e5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer; +//CHECKSTYLE.ON: AvoidStarImport +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.types.Value; + +/** + * A {@link TypeInformation} for the tuple types of the Java API. + * + * @param The type of the tuple. + */ +public final class TupleTypeInfo extends TupleTypeInfoBase { + + private static final long serialVersionUID = 1L; + + protected final String[] fieldNames; + + @SuppressWarnings("unchecked") + public TupleTypeInfo(TypeInformation... types) { + this((Class) Tuple.getTupleClass(types.length), types); + } + + public TupleTypeInfo(Class tupleType, TypeInformation... types) { + super(tupleType, types); + + Preconditions.checkArgument( + types.length <= Tuple.MAX_ARITY, + "The tuple type exceeds the maximum supported arity."); + + this.fieldNames = new String[types.length]; + + for (int i = 0; i < types.length; i++) { + fieldNames[i] = "f" + i; + } + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public int getFieldIndex(String fieldName) { + int fieldIndex = Integer.parseInt(fieldName.substring(1)); + if (fieldIndex >= getArity()) { + return -1; + } + return fieldIndex; + } + + @SuppressWarnings("unchecked") + @Override + public TupleSerializer createSerializer(ExecutionConfig executionConfig) { + if (getTypeClass() == Tuple0.class) { + return (TupleSerializer) Tuple0Serializer.INSTANCE; + } + + TypeSerializer[] fieldSerializers = new TypeSerializer[getArity()]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = types[i].createSerializer(executionConfig); + } + + Class tupleClass = getTypeClass(); + + return new TupleSerializer(tupleClass, fieldSerializers); + } + + @Override + protected TypeComparatorBuilder createTypeComparatorBuilder() { + return new TupleTypeComparatorBuilder(); + } + + private class TupleTypeComparatorBuilder implements TypeComparatorBuilder { + + private final ArrayList fieldComparators = new ArrayList(); + private final ArrayList logicalKeyFields = new ArrayList(); + + @Override + public void initializeTypeComparatorBuilder(int size) { +
[19/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java new file mode 100644 index 000..c0c7797 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.util.List; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.util.InstantiationUtil; + + +public final class PojoComparator extends CompositeTypeComparator implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // Reflection fields for the comp fields + private transient Field[] keyFields; + + private final TypeComparator[] comparators; + + private final int[] normalizedKeyLengths; + + private final int numLeadingNormalizableKeys; + + private final int normalizableKeyPrefixLen; + + private final boolean invertNormKey; + + private TypeSerializer serializer; + + private final Class type; + + @SuppressWarnings("unchecked") + public PojoComparator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, Class type) { + this.keyFields = keyFields; + this.comparators = (TypeComparator[]) comparators; + + this.type = type; + this.serializer = serializer; + + // set up auxiliary fields for normalized key support + this.normalizedKeyLengths = new int[keyFields.length]; + int nKeys = 0; + int nKeyLen = 0; + boolean inverted = false; + + for (int i = 0; i < this.comparators.length; i++) { + TypeComparator k = this.comparators[i]; + if(k == null) { + throw new IllegalArgumentException("One of the passed comparators is null"); + } + if(keyFields[i] == null) { + throw new IllegalArgumentException("One of the passed reflection fields is null"); + } + + // as long as the leading keys support normalized keys, we can build up the composite key + if (k.supportsNormalizedKey()) { + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + inverted = k.invertNormalizedKey(); + } + else if (k.invertNormalizedKey() != inverted) { + // if a successor does not agree on the invertion direction, it cannot be part of the normalized key + break; + } + + nKeys++; + final int len = k.getNormalizeKeyLen(); + if (len < 0) { + throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len); + } + this.normalizedKeyLengths[i] = len; +
[32/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
[FLINK-3303] [core] Move Tuple classes to flink-core Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7081836e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7081836e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7081836e Branch: refs/heads/master Commit: 7081836e0d640ee640687606bd73b6673d3f2a07 Parents: 5474386 Author: Stephan EwenAuthored: Sun Jan 31 23:08:18 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 2 16:55:44 2016 +0100 -- .../org/apache/flink/api/java/tuple/Tuple.java | 118 +++ .../org/apache/flink/api/java/tuple/Tuple0.java | 98 +++ .../org/apache/flink/api/java/tuple/Tuple1.java | 154 .../apache/flink/api/java/tuple/Tuple10.java| 289 ++ .../apache/flink/api/java/tuple/Tuple11.java| 304 +++ .../apache/flink/api/java/tuple/Tuple12.java| 319 +++ .../apache/flink/api/java/tuple/Tuple13.java| 334 +++ .../apache/flink/api/java/tuple/Tuple14.java| 349 .../apache/flink/api/java/tuple/Tuple15.java| 364 .../apache/flink/api/java/tuple/Tuple16.java| 379 .../apache/flink/api/java/tuple/Tuple17.java| 394 + .../apache/flink/api/java/tuple/Tuple18.java| 409 + .../apache/flink/api/java/tuple/Tuple19.java| 424 + .../org/apache/flink/api/java/tuple/Tuple2.java | 177 .../apache/flink/api/java/tuple/Tuple20.java| 439 + .../apache/flink/api/java/tuple/Tuple21.java| 454 ++ .../apache/flink/api/java/tuple/Tuple22.java| 469 ++ .../apache/flink/api/java/tuple/Tuple23.java| 484 ++ .../apache/flink/api/java/tuple/Tuple24.java| 499 +++ .../apache/flink/api/java/tuple/Tuple25.java| 514 +++ .../org/apache/flink/api/java/tuple/Tuple3.java | 184 .../org/apache/flink/api/java/tuple/Tuple4.java | 199 + .../org/apache/flink/api/java/tuple/Tuple5.java | 214 + .../org/apache/flink/api/java/tuple/Tuple6.java | 229 + .../org/apache/flink/api/java/tuple/Tuple7.java | 244 + .../org/apache/flink/api/java/tuple/Tuple8.java | 259 ++ .../org/apache/flink/api/java/tuple/Tuple9.java | 274 ++ .../api/java/tuple/builder/Tuple0Builder.java | 46 + .../api/java/tuple/builder/Tuple10Builder.java | 46 + .../api/java/tuple/builder/Tuple11Builder.java | 46 + .../api/java/tuple/builder/Tuple12Builder.java | 46 + .../api/java/tuple/builder/Tuple13Builder.java | 46 + .../api/java/tuple/builder/Tuple14Builder.java | 46 + .../api/java/tuple/builder/Tuple15Builder.java | 46 + .../api/java/tuple/builder/Tuple16Builder.java | 46 + .../api/java/tuple/builder/Tuple17Builder.java | 46 + .../api/java/tuple/builder/Tuple18Builder.java | 46 + .../api/java/tuple/builder/Tuple19Builder.java | 46 + .../api/java/tuple/builder/Tuple1Builder.java | 46 + .../api/java/tuple/builder/Tuple20Builder.java | 46 + .../api/java/tuple/builder/Tuple21Builder.java | 46 + .../api/java/tuple/builder/Tuple22Builder.java | 46 + .../api/java/tuple/builder/Tuple23Builder.java | 46 + .../api/java/tuple/builder/Tuple24Builder.java | 46 + .../api/java/tuple/builder/Tuple25Builder.java | 46 + .../api/java/tuple/builder/Tuple2Builder.java | 46 + .../api/java/tuple/builder/Tuple3Builder.java | 46 + .../api/java/tuple/builder/Tuple4Builder.java | 46 + .../api/java/tuple/builder/Tuple5Builder.java | 46 + .../api/java/tuple/builder/Tuple6Builder.java | 46 + .../api/java/tuple/builder/Tuple7Builder.java | 46 + .../api/java/tuple/builder/Tuple8Builder.java | 46 + .../api/java/tuple/builder/Tuple9Builder.java | 46 + .../flink/api/java/tuple/TupleGenerator.java| 521 +++ .../org/apache/flink/api/java/tuple/Tuple.java | 118 --- .../org/apache/flink/api/java/tuple/Tuple0.java | 98 --- .../org/apache/flink/api/java/tuple/Tuple1.java | 154 .../apache/flink/api/java/tuple/Tuple10.java| 289 -- .../apache/flink/api/java/tuple/Tuple11.java| 304 --- .../apache/flink/api/java/tuple/Tuple12.java| 319 --- .../apache/flink/api/java/tuple/Tuple13.java| 334 --- .../apache/flink/api/java/tuple/Tuple14.java| 349 .../apache/flink/api/java/tuple/Tuple15.java| 364 .../apache/flink/api/java/tuple/Tuple16.java| 379 .../apache/flink/api/java/tuple/Tuple17.java| 394 - .../apache/flink/api/java/tuple/Tuple18.java| 409 - .../apache/flink/api/java/tuple/Tuple19.java| 424 - .../org/apache/flink/api/java/tuple/Tuple2.java | 177 .../apache/flink/api/java/tuple/Tuple20.java| 439 - .../apache/flink/api/java/tuple/Tuple21.java| 454 -- .../apache/flink/api/java/tuple/Tuple22.java| 469 --
[07/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java deleted file mode 100644 index 276ffc4..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Preconditions; - -import org.apache.avro.generic.GenericData; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; -import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.objenesis.strategy.StdInstantiatorStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - -/** - * A type serializer that serializes its type using the Kryo serialization - * framework (https://github.com/EsotericSoftware/kryo). - * - * This serializer is intended as a fallback serializer for the cases that are - * not covered by the basic types, tuples, and POJOs. - * - * @param The type to be serialized. - */ -public class KryoSerializer extends TypeSerializer { - - private static final long serialVersionUID = 3L; - - private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class); - - // - - private final LinkedHashMapregisteredTypesWithSerializers; - private final LinkedHashMap > registeredTypesWithSerializerClasses; - private final LinkedHashMap defaultSerializers; - private final LinkedHashMap > defaultSerializerClasses; - private final LinkedHashSet registeredTypes; - - private final Class type; - - // - // The fields below are lazily initialized after duplication or deserialization. - - private transient Kryo kryo; - private transient T copyInstance; - - private transient DataOutputView previousOut; - private transient DataInputView previousIn; - - private transient Input input; - private transient Output output; - - // - - public KryoSerializer(Class type, ExecutionConfig executionConfig){ - this.type = Preconditions.checkNotNull(type); - - this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); - this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); -
[28/38] flink git commit: [FLINK-3303] [core] Move Tuple classes to flink-core
http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java new file mode 100644 index 000..2ab1d0a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -- + + +package org.apache.flink.api.java.tuple.builder; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple15; + +public class Tuple15Builder{ + + private List > tuples = new ArrayList<>(); + + public Tuple15Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14){ + tuples.add(new Tuple15<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14)); + return this; + } + + @SuppressWarnings("unchecked") + public Tuple15 [] build(){ + return tuples.toArray(new Tuple15[tuples.size()]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7081836e/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java new file mode 100644 index 000..5b4fba2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -- + + +package org.apache.flink.api.java.tuple.builder; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple16; + +public class Tuple16Builder { + + private List > tuples = new ArrayList<>(); + + public Tuple16Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8,
flink git commit: [FLINK-3175][Kafka 0.8] Relax test condition
Repository: flink Updated Branches: refs/heads/master f8677464b -> 682d8d5e2 [FLINK-3175][Kafka 0.8] Relax test condition Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/682d8d5e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/682d8d5e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/682d8d5e Branch: refs/heads/master Commit: 682d8d5e2ce9758ae67276f4584dd8ec20f4e8ad Parents: f867746 Author: Robert MetzgerAuthored: Tue Feb 2 11:32:30 2016 +0100 Committer: Robert Metzger Committed: Tue Feb 2 11:32:30 2016 +0100 -- .../flink/streaming/connectors/kafka/Kafka08ITCase.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/682d8d5e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 26e31f5..6a2fa27 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -257,9 +257,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); // ensure that the offset has been committed - assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 && o1 <= 100); - assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 && o2 <= 100); - assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 && o3 <= 100); + boolean atLeastOneOffsetSet = (o1 > 0 && o1 <= 100) || + (o2 > 0 && o2 <= 100) || + (o3 > 0 && o3 <= 100); + assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet); + //assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 && o1 <= 100); + //assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 && o2 <= 100); + //assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 && o3 <= 100); deleteTestTopic(topicName); }
[3/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition Implements NFA using the SharedBuffer Implements NFACompiler to compile a Pattern into a NFA Add CEP operator Makes NFA and SharedBuffer serializable Add serializability support to SharedBuffer and NFA Add keyed cep pattern operator Adds CEP documentation Adds online documentation for the CEP library Copies sequence events before giving them to the UDF Fix correct scala type suffixes This closes #1557. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79058edb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79058edb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79058edb Branch: refs/heads/master Commit: 79058edb67095120558add534ba37304425fa602 Parents: 682d8d5 Author: Till RohrmannAuthored: Thu Jan 14 10:04:23 2016 +0100 Committer: Till Rohrmann Committed: Tue Feb 2 15:04:08 2016 +0100 -- docs/libs/cep/index.md | 300 +++ flink-libraries/flink-cep/pom.xml | 77 ++ .../src/main/java/org/apache/flink/cep/CEP.java | 100 +++ .../flink/cep/NonDuplicatingTypeSerializer.java | 195 + .../flink/cep/PatternFlatSelectFunction.java| 54 ++ .../apache/flink/cep/PatternSelectFunction.java | 54 ++ .../org/apache/flink/cep/PatternStream.java | 151 .../apache/flink/cep/nfa/ComputationState.java | 84 ++ .../org/apache/flink/cep/nfa/DeweyNumber.java | 163 .../main/java/org/apache/flink/cep/nfa/NFA.java | 406 + .../org/apache/flink/cep/nfa/SharedBuffer.java | 858 +++ .../java/org/apache/flink/cep/nfa/State.java| 109 +++ .../apache/flink/cep/nfa/StateTransition.java | 84 ++ .../flink/cep/nfa/StateTransitionAction.java| 28 + .../flink/cep/nfa/compiler/NFACompiler.java | 187 .../operator/AbstractCEPPatternOperator.java| 108 +++ .../flink/cep/operator/CEPPatternOperator.java | 137 +++ .../cep/operator/KeyedCEPPatternOperator.java | 331 +++ .../cep/operator/StreamRecordComparator.java| 44 + .../flink/cep/pattern/AndFilterFunction.java| 44 + .../flink/cep/pattern/FollowedByPattern.java| 33 + .../org/apache/flink/cep/pattern/Pattern.java | 168 .../cep/pattern/SubtypeFilterFunction.java | 43 + .../java/org/apache/flink/cep/CEPITCase.java| 406 + .../test/java/org/apache/flink/cep/Event.java | 77 ++ .../java/org/apache/flink/cep/StreamEvent.java | 41 + .../java/org/apache/flink/cep/SubEvent.java | 49 ++ .../apache/flink/cep/nfa/DeweyNumberTest.java | 54 ++ .../org/apache/flink/cep/nfa/NFAITCase.java | 160 .../java/org/apache/flink/cep/nfa/NFATest.java | 261 ++ .../apache/flink/cep/nfa/SharedBufferTest.java | 136 +++ .../flink/cep/nfa/compiler/NFACompilerTest.java | 129 +++ .../apache/flink/cep/pattern/PatternTest.java | 145 flink-libraries/pom.xml | 1 + .../streaming/api/operators/StreamOperator.java | 2 +- 35 files changed, 5218 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/docs/libs/cep/index.md -- diff --git a/docs/libs/cep/index.md b/docs/libs/cep/index.md new file mode 100644 index 000..04e2b73 --- /dev/null +++ b/docs/libs/cep/index.md @@ -0,0 +1,300 @@ +--- +title: "FlinkCEP - Complex event processing for Flink" +# Top navigation +top-nav-group: libs +top-nav-pos: 2 +top-nav-title: CEP +# Sub navigation +sub-nav-group: batch +sub-nav-id: flinkcep +sub-nav-pos: 2 +sub-nav-parent: libs +sub-nav-title: CEP +--- + + +FlinkCEP is the complex event processing library for Flink. +It allows you to easily detect complex event patterns in a stream of endless data. +Complex events can then be constructed from matching sequences. +This gives you the opportunity to quickly get hold of what's really important in your data. + +## Getting Started + +If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/apis/batch/index.html#linking-with-flink). +Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project. + +{% highlight xml %} + + org.apache.flink + flink-cep{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that FlinkCEP is currently not part of the binary distribution. +See linking with it for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +Now you can start writing your first CEP program using the pattern API. + +{% highlight java %} +DataStream input = ... + +Pattern pattern = Pattern.begin("start").where(evt -> evt.getId()
[2/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java new file mode 100644 index 000..e1c0099 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -0,0 +1,858 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import com.google.common.collect.LinkedHashMultimap; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Stack; + +/** + * A shared buffer implementation which stores values under a key. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each + * buffer page maintains a collection of the inserted values. + * + * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + * + * @param Type of the keys + * @param Type of the values + */ +public class SharedBuffer implements Serializable { + private static final long serialVersionUID = 9213251042562206495L; + + private final TypeSerializer valueSerializer; + + private transient Map> pages; + + public SharedBuffer(final TypeSerializer valueSerializer) { + this.valueSerializer = valueSerializer; + pages = new HashMap<>(); + } + + /** +* Stores given value (value + timestamp) under the given key. It assigns a preceding element +* relation to the entry which is defined by the previous key, value (value + timestamp). +* +* @param key Key of the current value +* @param value Current value +* @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) +* @param previousKey Key of the value for the previous relation +* @param previousValue Value for the previous relation +* @param previousTimestamp Timestamp of the value for the previous relation +* @param version Version of the previous relation +*/ + public void put( + final K key, + final V value, + final long timestamp, + final K previousKey, + final V previousValue, + final long previousTimestamp, + final DeweyNumber version) { + SharedBufferPage page; + + if (!pages.containsKey(key)) { + page = new SharedBufferPage (key); + pages.put(key, page); + } else { + page = pages.get(key); + } + + final SharedBufferEntry previousSharedBufferEntry = get(previousKey, previousValue, previousTimestamp); + + page.add(
[1/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
Repository: flink Updated Branches: refs/heads/master 682d8d5e2 -> 79058edb6 http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java new file mode 100644 index 000..7dcda4c --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Map; + +public class CEPITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + /** +* Checks that a certain event sequence is recognized +* @throws Exception +*/ + @Test + public void testSimplePatternCEP() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new Event(5, "middle", 5.0), + new SubEvent(6, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(8, "end", 1.0) + ); + + Patternpattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5681493970790509488L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where( + new FilterFunction() { + private static final long serialVersionUID = 448591738315698540L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("middle"); + } + } + ) + .followedBy("end").where(new FilterFunction() { + private static final long serialVersionUID = 6080276591060431966L; + + @Override + public boolean filter(Event value) throws Exception { +