flink git commit: [FLINK-3225] Enforce translation to DataSetNodes
Repository: flink Updated Branches: refs/heads/tableOnCalcite 7ecb70105 -> 297564646 [FLINK-3225] Enforce translation to DataSetNodes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29756464 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29756464 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29756464 Branch: refs/heads/tableOnCalcite Commit: 2975646469bcc6b29b36e5d8bb61663af6da8749 Parents: 7ecb701 Author: Fabian HueskeAuthored: Mon Feb 1 23:45:16 2016 +0100 Committer: Fabian Hueske Committed: Mon Feb 1 23:47:08 2016 +0100 -- .../flink/api/java/table/JavaBatchTranslator.scala | 16 +--- 1 file changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/29756464/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 66bfbe7..7e91190 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.java.table import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelCollations, RelNode} import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.plan._ import org.apache.flink.api.table.Table -import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.DataSetTable @@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator { // get the planner for the plan val planner = lPlan.getCluster.getPlanner -// we do not have any special requirements for the output -val outputProps = RelTraitSet.createEmpty() println("---") println("Input Plan:") println("---") println(RelOptUtil.toString(lPlan)) - + // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(lPlan) // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) -val optPlan = optProgram.run(planner, decorPlan, outputProps) +val flinkOutputProps = RelTraitSet.createEmpty() +val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps) println("---") println("Optimized Plan:") @@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator { // optimize the logical Flink plan val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES) -val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps) +val dataSetOutputProps = RelTraitSet.createEmpty() + .plus(DataSetConvention.INSTANCE) + .plus(RelCollations.of()).simplify() +val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps) println("-") println("DataSet Plan:")
flink git commit: [FLINK-3300] fix concurrency bug in YarnJobManager
Repository: flink Updated Branches: refs/heads/master 086acf681 -> 2a49eaaf3 [FLINK-3300] fix concurrency bug in YarnJobManager Adds message passing between Hadoop's async resource manager client and the YarnJobManager actor. This closes #1561. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a49eaaf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a49eaaf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a49eaaf Branch: refs/heads/master Commit: 2a49eaaf3c949864457aee0ffd99343a50ac7285 Parents: 086acf6 Author: Maximilian MichelsAuthored: Fri Jan 29 11:21:08 2016 +0100 Committer: Maximilian Michels Committed: Mon Feb 1 10:09:37 2016 +0100 -- .../org/apache/flink/yarn/YarnJobManager.scala | 425 +-- .../org/apache/flink/yarn/YarnMessages.scala| 21 +- 2 files changed, 231 insertions(+), 215 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala -- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 135f87e..92eb4be 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -180,6 +180,7 @@ class YarnJobManager( _ ! decorateMessage(JobManagerStopped) } + // Shutdown and discard all queued messages context.system.shutdown() case RegisterApplicationClient => @@ -220,6 +221,85 @@ class YarnJobManager( case StartYarnSession(hadoopConfig, webServerPort) => startYarnSession(hadoopConfig, webServerPort) + +case YarnContainersAllocated(containers: JavaList[Container]) => + val newlyAllocatedContainers = containers.asScala + + newlyAllocatedContainers.foreach { +container => log.info(s"Got new container for allocation: ${container.getId}") + } + + allocatedContainersList ++= containers.asScala + numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length) + + allocateContainers() + + if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) { +log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + + s"are not needed right now. Returning them") +for (container <- allocatedContainersList) { + rmClientOption match { +case Some(client) => client.releaseAssignedContainer(container.getId) +case None => + } +} +allocatedContainersList = List() + } + + +case YarnContainersCompleted(statuses: JavaList[ContainerStatus]) => + + val completedContainerStatuses = statuses.asScala + val idStatusMap = completedContainerStatuses +.map(status => (status.getContainerId, status)).toMap + + completedContainerStatuses.foreach { +status => log.info(s"Container ${status.getContainerId} is completed " + + s"with diagnostics: ${status.getDiagnostics}") + } + + // get failed containers (returned containers are also completed, so we have to + // distinguish if it was running before). + val (completedContainers, remainingRunningContainers) = runningContainersList +.partition(idStatusMap contains _.getId) + + completedContainers.foreach { +container => + val status = idStatusMap(container.getId) + failedContainers += 1 + runningContainers -= 1 + log.info(s"Container ${status.getContainerId} was a running container. " + +s"Total failed containers $failedContainers.") + val detail = status.getExitStatus match { +case -103 => "Vmem limit exceeded"; +case -104 => "Pmem limit exceeded"; +case _ => "" + } + messageListener foreach { +_ ! decorateMessage( + YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + +s"state=${status.getState}.\n${status.getDiagnostics} $detail") +) + } + } + + runningContainersList = remainingRunningContainers + + // maxFailedContainers == -1 is infinite number of retries. + if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) { +val msg = s"Stopping YARN session because the number of failed " + + s"containers ($failedContainers) exceeded the maximum failed container " + + s"count ($maxFailedContainers). This number is controlled by " + + s"the
flink git commit: [hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema with more efficient reusable buffers.
Repository: flink Updated Branches: refs/heads/master 67b380d61 -> 92efcd34a [hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema with more efficient reusable buffers. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92efcd34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92efcd34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92efcd34 Branch: refs/heads/master Commit: 92efcd34a5da2bccb07666f2c647974ea3e7c94f Parents: 67b380d Author: Stephan EwenAuthored: Mon Feb 1 14:39:24 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 17:29:02 2016 +0100 -- .../typeutils/runtime/ByteArrayInputView.java | 40 .../runtime/kryo/KryoClearedBufferTest.java | 8 +++- .../runtime/util/DataInputDeserializer.java | 48 .../runtime/util/DataOutputSerializer.java | 25 ++ ...eInformationKeyValueSerializationSchema.java | 44 +- .../connectors/kafka/KafkaConsumerTestBase.java | 17 +-- .../TypeInformationSerializationSchema.java | 14 -- .../TypeInformationSerializationSchemaTest.java | 2 +- 8 files changed, 109 insertions(+), 89 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java deleted file mode 100644 index 48d6a3d..000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.core.memory.DataInputView; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; - -public class ByteArrayInputView extends DataInputStream implements DataInputView { - - public ByteArrayInputView(byte[] data) { - super(new ByteArrayInputStream(data)); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - int skipped = skipBytes(numBytes); - numBytes -= skipped; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java index ab2e45f..7572408 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java @@ -22,13 +22,16 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; + import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; + import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.Serializable; @@ -69,7 +72,8 @@ public class KryoClearedBufferTest {
flink git commit: [fix] [docs] Fix typo in savepoints documentation
Repository: flink Updated Branches: refs/heads/master 92efcd34a -> 6c0a83e4f [fix] [docs] Fix typo in savepoints documentation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c0a83e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c0a83e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c0a83e4 Branch: refs/heads/master Commit: 6c0a83e4fa1ef3d68df31bf01618972c4a445b21 Parents: 92efcd3 Author: Till RohrmannAuthored: Mon Feb 1 18:16:07 2016 +0100 Committer: Till Rohrmann Committed: Mon Feb 1 18:16:07 2016 +0100 -- docs/apis/streaming/savepoints.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c0a83e4/docs/apis/streaming/savepoints.md -- diff --git a/docs/apis/streaming/savepoints.md b/docs/apis/streaming/savepoints.md index 910f503..f04845c 100644 --- a/docs/apis/streaming/savepoints.md +++ b/docs/apis/streaming/savepoints.md @@ -79,7 +79,7 @@ For savepoints **only stateful tasks matter**. In the above example, the source Each task is identified by its **generated task IDs** and **subtask index**. In the above example the state of the source (**s1**, **s2**) and map tasks (**m1**, **m2**) is identified by their respective task ID (*0xC322EC* for the source tasks and *0x27B3EF* for the map tasks) and subtask index. There is no state for the sinks (**t1**, **t2**). Their IDs therefore do not matter. -Important The IDs are generated **deterministically** from your program structure. This means that as long as your program does not change, the IDs do not change. **The only allowed changes are within the user function, e.g. you can change the implemented `MapFunction` without changing the typology**. In this case, it is straight forward to restore the state from a savepoint by mapping it back to the same task IDs and subtask indexes. This allows you to work with savepoints out of the box, but gets problematic as soon as you make changes to the topology, because they result in changed IDs and the savepoint state cannot be mapped to your program any more. +Important The IDs are generated **deterministically** from your program structure. This means that as long as your program does not change, the IDs do not change. **The only allowed changes are within the user function, e.g. you can change the implemented `MapFunction` without changing the topology**. In this case, it is straight forward to restore the state from a savepoint by mapping it back to the same task IDs and subtask indexes. This allows you to work with savepoints out of the box, but gets problematic as soon as you make changes to the topology, because they result in changed IDs and the savepoint state cannot be mapped to your program any more. Recommended In order to be able to change your program and **have fixed IDs**, the *DataStream* API provides a method to manually specify the task IDs. Each operator provides a **`uid(String)`** method to override the generated ID. The ID is a String, which will be deterministically hashed to a 16-byte hash value. It is **important** that the specified IDs are **unique per transformation and job**. If this is not the case, job submission will fail.
flink git commit: [hotfix] fix non-exhaustive match warning
Repository: flink Updated Branches: refs/heads/master 2a49eaaf3 -> ef58cf302 [hotfix] fix non-exhaustive match warning Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef58cf30 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef58cf30 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef58cf30 Branch: refs/heads/master Commit: ef58cf30291f49a5037248cbf10a0d9acd67b09e Parents: 2a49eaa Author: Maximilian MichelsAuthored: Mon Feb 1 12:52:26 2016 +0100 Committer: Maximilian Michels Committed: Mon Feb 1 12:54:13 2016 +0100 -- .../main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ef58cf30/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index dc17742..df7e6c6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1692,6 +1692,7 @@ object JobManager { result match { case scala.util.Failure(f) => throw f + case _ => } }
flink git commit: [FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet
Repository: flink Updated Branches: refs/heads/master 6c0a83e4f -> 233c0147b [FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233c0147 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233c0147 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233c0147 Branch: refs/heads/master Commit: 233c0147b1e9c18034d37f803e8974365a5a7d86 Parents: 6c0a83e Author: chengxiang liAuthored: Tue Feb 2 14:09:52 2016 +0800 Committer: chengxiang li Committed: Tue Feb 2 14:09:52 2016 +0800 -- .../operators/udf/RangeBoundaryBuilder.java | 16 +- .../api/scala/operators/PartitionITCase.scala | 32 2 files changed, 41 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java index cd163d3..09bd42d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RangeBoundaryBuilder.java @@ -60,13 +60,15 @@ public class RangeBoundaryBuilder extends RichMapPartitionFunction 0) { + double avgRange = sampledData.size() / (double) parallelism; + int numKey = comparator.getFlatComparators().length; + for (int i = 1; i < parallelism; i++) { + T record = sampledData.get((int) (i * avgRange)); + Object[] keys = new Object[numKey]; + comparator.extractKeys(record, keys, 0); + boundaries[i-1] = keys; + } } out.collect(boundaries); http://git-wip-us.apache.org/repos/asf/flink/blob/233c0147/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala -- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala index ca8bcd9..df13cd4 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala @@ -50,6 +50,38 @@ class PartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( } @Test + def testEmptyHashPartition(): Unit = { +/* + * Test hash partition by tuple field + */ +val env = ExecutionEnvironment.getExecutionEnvironment +val ds = env.fromCollection(Seq[Tuple1[String]]()) + +val unique = ds.partitionByHash(0) + +unique.writeAsText(resultPath, WriteMode.OVERWRITE) +env.execute() + +expected = "" + } + + @Test + def testEmptyRangePartition(): Unit = { +/* + * Test hash partition by tuple field + */ +val env = ExecutionEnvironment.getExecutionEnvironment +val ds = env.fromCollection(Seq[Tuple1[String]]()) + +val unique = ds.partitionByRange(0) + +unique.writeAsText(resultPath, WriteMode.OVERWRITE) +env.execute() + +expected = "" + } + + @Test def testHashPartitionByTupleField(): Unit = { /* * Test hash partition by tuple field
[1/5] flink git commit: [FLINK-3306] [core] Fix auto-type registry util
Repository: flink Updated Branches: refs/heads/master ef58cf302 -> 5e2fb3cb5 [FLINK-3306] [core] Fix auto-type registry util Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bc47af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bc47af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bc47af Branch: refs/heads/master Commit: c4bc47afa6147dd25d8b579e764b30c9c13ee8ea Parents: d902d16 Author: Stephan EwenAuthored: Fri Jan 29 17:08:32 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:45:55 2016 +0100 -- .../api/io/avro/AvroRecordInputFormatTest.java | 115 - .../flink/api/java/ExecutionEnvironment.java| 51 +++- .../flink/api/java/typeutils/TypeExtractor.java | 5 + .../typeutils/runtime/kryo/Serializers.java | 128 +++ .../kryo/KryoGenericTypeSerializerTest.java | 31 ++--- .../typeutils/runtime/kryo/SerializersTest.java | 61 +++-- 6 files changed, 217 insertions(+), 174 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java -- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java index c04435c..42cbebe 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -22,10 +22,7 @@ import static org.junit.Assert.*; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -219,41 +216,43 @@ public class AvroRecordInputFormatTest { */ @Test public void testDeserializeToGenericType() throws IOException { - DatumReader datumReader = new GenericDatumReader(userSchema); - - FileReader dataFileReader = DataFileReader.openReader(testFile, datumReader); - // initialize Record by reading it from disk (thats easier than creating it by hand) - GenericData.Record rec = new GenericData.Record(userSchema); - dataFileReader.next(rec); - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - assertEquals(null, rec.get("type_long_test")); // it is null for the first record. - - // now serialize it with our framework: - - TypeInformation te = (TypeInformation) TypeExtractor.createTypeInfo(GenericData.Record.class); - ExecutionConfig ec = new ExecutionConfig(); - Assert.assertEquals(GenericTypeInfo.class, te.getClass()); - Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec); - - TypeSerializer tser = te.createSerializer(ec); - Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); - Assert.assertTrue( - ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && - ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); - ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); - tser.serialize(rec, target); - - GenericData.Record newRec = tser.deserialize(target.getInputView()); - - // check if it is still the same - assertNotNull(newRec); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); - assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); - assertEquals(null, newRec.get("type_long_test")); - + DatumReader datumReader = new GenericDatumReader<>(userSchema); + + try (FileReader dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new
[4/5] flink git commit: [FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time
[FLINK-3305] [core] Remove limited and inconsistent auto-magic for Joda Time The auto-magic for Joda Time was limited to very few classes. It was intransparent what cases would be handled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6110dc3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6110dc3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6110dc3 Branch: refs/heads/master Commit: b6110dc35a17340653a39209038041a5e28054b4 Parents: c4bc47a Author: Stephan EwenAuthored: Fri Jan 29 18:51:03 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:46:06 2016 +0100 -- flink-java/pom.xml | 30 +++--- .../flink/api/java/ExecutionEnvironment.java| 26 .../typeutils/runtime/kryo/Serializers.java | 42 +++- .../api/operators/TimestampedCollector.java | 12 +++--- .../runtime/KryoGenericTypeSerializerTest.scala | 37 +++-- .../api/scala/runtime/TupleSerializerTest.scala | 29 -- 6 files changed, 68 insertions(+), 108 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/pom.xml -- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 8383a4a..a31e89d 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -70,22 +70,6 @@ under the License. - de.javakaffee - kryo-serializers - 0.27 - - - - joda-time - joda-time - - - - org.joda - joda-convert - - - com.google.guava guava ${guava.version} @@ -104,6 +88,20 @@ under the License. test-jar test + + + joda-time + joda-time + 2.5 + test + + + + org.joda + joda-convert + 1.7 + test + http://git-wip-us.apache.org/repos/asf/flink/blob/b6110dc3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 10cb5e3..253ffa3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -18,9 +18,10 @@ package org.apache.flink.api.java; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; +import com.esotericsoftware.kryo.Serializer; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -46,7 +47,10 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.*; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -54,14 +58,22 @@ import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.Visitor; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.esotericsoftware.kryo.Serializer; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import
[5/5] flink git commit: [hotfix] Add deprecation message to old Key interface
[hotfix] Add deprecation message to old Key interface Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e2fb3cb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e2fb3cb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e2fb3cb Branch: refs/heads/master Commit: 5e2fb3cb5eb0ab181d62e864bb0443d656de2f83 Parents: b6110dc Author: Stephan EwenAuthored: Sun Jan 31 23:19:12 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:46:06 2016 +0100 -- flink-core/src/main/java/org/apache/flink/types/Key.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5e2fb3cb/flink-core/src/main/java/org/apache/flink/types/Key.java -- diff --git a/flink-core/src/main/java/org/apache/flink/types/Key.java b/flink-core/src/main/java/org/apache/flink/types/Key.java index ceeeb57..cc1978c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Key.java +++ b/flink-core/src/main/java/org/apache/flink/types/Key.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.types; /** @@ -31,7 +30,11 @@ package org.apache.flink.types; * @see org.apache.flink.types.Value * @see org.apache.flink.core.io.IOReadableWritable * @see java.lang.Comparable + * + * @deprecated The Key type is a relict of a deprecated and removed API and will be removed + * in future versions as well. */ +@Deprecated public interface Key extends Value, Comparable { /**
[2/5] flink git commit: [hotfix] Clean up warnings in Serializers util class
[hotfix] Clean up warnings in Serializers util class Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d902d164 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d902d164 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d902d164 Branch: refs/heads/master Commit: d902d164556e631d1c8edea475515901691c639c Parents: 8c8f1c4 Author: Stephan EwenAuthored: Fri Jan 29 16:29:31 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:45:55 2016 +0100 -- .../typeutils/runtime/kryo/Serializers.java | 44 +--- 1 file changed, 30 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d902d164/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 76f8eb4..0ea8691 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.typeutils.runtime.kryo; import com.esotericsoftware.kryo.Kryo; @@ -60,48 +61,51 @@ public class Serializers { * to Kryo. * It also watches for types which need special serializers. */ - private static Set alreadySeen = new HashSet (); + private static Set alreadySeen = new HashSet<>(); public static void recursivelyRegisterType(Class type, ExecutionConfig config) { alreadySeen.add(type); - if(type.isPrimitive()) { + + if (type.isPrimitive()) { return; } config.registerKryoType(type); addSerializerForType(config, type); Field[] fields = type.getDeclaredFields(); - for(Field field : fields) { + for (Field field : fields) { if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { continue; } Type fieldType = field.getGenericType(); - if(fieldType instanceof ParameterizedType) { // field has generics + if (fieldType instanceof ParameterizedType) { // field has generics ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType; - for(Type t: parameterizedFieldType.getActualTypeArguments()) { - if(TypeExtractor.isClassType(t) ) { - Class clazz = TypeExtractor.typeToClass(t); - if(!alreadySeen.contains(clazz)) { + for (Type t: parameterizedFieldType.getActualTypeArguments()) { + if (TypeExtractor.isClassType(t) ) { + Class clazz = TypeExtractor.typeToClass(t); + if (!alreadySeen.contains(clazz)) { recursivelyRegisterType(TypeExtractor.typeToClass(t), config); } } } } Class clazz = field.getType(); - if(!alreadySeen.contains(clazz)) { + if (!alreadySeen.contains(clazz)) { recursivelyRegisterType(clazz, config); } } } public static void addSerializerForType(ExecutionConfig reg, Class type) { - if(GenericData.Record.class.isAssignableFrom(type)) { + if (GenericData.Record.class.isAssignableFrom(type)) { registerGenericAvro(reg); } - if(SpecificRecordBase.class.isAssignableFrom(type)) { - registerSpecificAvro(reg, (Class) type); + if (SpecificRecordBase.class.isAssignableFrom(type)) { + @SuppressWarnings("unchecked") + Class specRecordClass = (Class) type; +
[3/5] flink git commit: [hotfix] [streaming] Tasks print subtask in log statements
[hotfix] [streaming] Tasks print subtask in log statements Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c8f1c45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c8f1c45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c8f1c45 Branch: refs/heads/master Commit: 8c8f1c45face7105c9b60bcfd3f818eea19d42d1 Parents: ef58cf3 Author: Stephan EwenAuthored: Thu Jan 28 16:32:17 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 1 14:45:55 2016 +0100 -- .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8c8f1c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index b91c570..e4b6b6e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -379,7 +379,7 @@ public abstract class StreamTask * @return The name of the task. */ public String getName() { - return getEnvironment().getTaskInfo().getTaskName(); + return getEnvironment().getTaskInfo().getTaskNameWithSubtasks(); } /**
[2/2] flink git commit: [FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior
[FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior The methods snapshotState and notifyCheckpointComplete are always mutually exclusive. The RMQSource relies on this but the test makes a false assumption when it calls those two methods at the same time. This closes #1569. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67b380d6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67b380d6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67b380d6 Branch: refs/heads/master Commit: 67b380d617a942c11ab29a8d62d67b770245bb63 Parents: 83e6a2b Author: Maximilian MichelsAuthored: Mon Feb 1 16:30:52 2016 +0100 Committer: Maximilian Michels Committed: Mon Feb 1 16:51:35 2016 +0100 -- .../flink/streaming/connectors/rabbitmq/RMQSourceTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/67b380d6/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index aa19e5d..e0eed70 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -140,7 +140,9 @@ public class RMQSourceTest { } // check if the messages are being acknowledged and the transaction comitted - source.notifyCheckpointComplete(snapshotId); + synchronized (DummySourceContext.lock) { + source.notifyCheckpointComplete(snapshotId); + } totalNumberOfAcks += numIds; }
[1/2] flink git commit: Revert "[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ"
Repository: flink Updated Branches: refs/heads/master 5e2fb3cb5 -> 67b380d61 Revert "[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ" This reverts commit 6b01a89020f2de3f7710cf72336291b1e8ca8562. The introduced locks are not necessary. The checkpointing test case only needs to be adapted to the checkpointing runtime behavior. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83e6a2b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83e6a2b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83e6a2b5 Branch: refs/heads/master Commit: 83e6a2b5f7621a2bfd8797ab071a94492bbac3d5 Parents: 5e2fb3c Author: Maximilian MichelsAuthored: Mon Feb 1 16:28:07 2016 +0100 Committer: Maximilian Michels Committed: Mon Feb 1 16:50:48 2016 +0100 -- .../connectors/rabbitmq/RMQSource.java | 4 +- .../connectors/rabbitmq/RMQSourceTest.java | 79 .../source/MessageAcknowledgingSourceBase.java | 51 ++--- ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++--- 4 files changed, 34 insertions(+), 124 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/83e6a2b5/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 59bc057..09bb07c 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -196,9 +196,7 @@ public class RMQSource extends MultipleIdsMessageAcknowledgingSourceBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/83e6a2b5/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 0a3de84..aa19e5d 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -23,7 +23,6 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; @@ -32,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -105,83 +103,6 @@ public class RMQSourceTest { sourceThread.join(); } - /** -* Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause -* an issue. -* -* Without proper synchronization, the test will fail with a concurrent modification exception -* -*/ - @Test - public void testConcurrentAccess() throws Exception { - source.autoAck = false; - sourceThread.start(); - - final Tuple1 error = new Tuple1<>(null); - - Thread.sleep(5); - - Thread snapshotThread = new Thread(new Runnable() { - public long id = 0; - - @Override - public void run() { - while (!Thread.interrupted()) { - try { - source.snapshotState(id++, 0); - } catch (Exception e) { -