[FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats. This closes #2619
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc99deb4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc99deb4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc99deb4 Branch: refs/heads/master Commit: dc99deb43fb876400615b0b77cf3471768482646 Parents: a85d8e5 Author: twalthr <twal...@apache.org> Authored: Tue Oct 11 11:19:32 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat Oct 15 07:59:51 2016 +0200 ---------------------------------------------------------------------- .../flink/api/scala/ExecutionEnvironment.scala | 6 +-- .../org/apache/flink/api/scala/package.scala | 11 +++- .../scala/typeutils/TypeExtractionTest.scala | 53 ++++++++++++++++++++ .../scala/typeutils/TypeInfoFactoryTest.scala | 2 +- 4 files changed, 67 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc99deb4/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index f03cb84..4f9d569 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -110,7 +110,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { */ @PublicEvolving def getRestartStrategy: RestartStrategyConfiguration = { - javaEnv.getRestartStrategy() + javaEnv.getRestartStrategy } /** @@ -381,7 +381,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { require(inputFormat != null, "InputFormat must not be null.") require(filePath != null, "File path must not be null.") inputFormat.setFilePath(new Path(filePath)) - createInput(inputFormat, implicitly[TypeInformation[T]]) + createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]])) } /** @@ -392,7 +392,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { if (inputFormat == null) { throw new IllegalArgumentException("InputFormat must not be null.") } - createInput(inputFormat, implicitly[TypeInformation[T]]) + createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]])) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/dc99deb4/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala index e5ca465..6096388 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala @@ -21,8 +21,9 @@ package org.apache.flink.api import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils, ScalaNothingTypeInfo} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, ScalaNothingTypeInfo, TypeUtils} import _root_.scala.reflect.ClassTag import language.experimental.macros @@ -52,6 +53,14 @@ package object scala { // We need to wrap Java DataSet because we need the scala operations private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set) + // Checks if object has explicit type information using ResultTypeQueryable + private[flink] def explicitFirst[T]( + funcOrInputFormat: AnyRef, + typeInfo: TypeInformation[T]): TypeInformation[T] = funcOrInputFormat match { + case rtq: ResultTypeQueryable[T] => rtq.getProducedType + case _ => typeInfo + } + private[flink] def fieldNames2Indices( typeInfo: TypeInformation[_], fields: Array[String]): Array[Int] = { http://git-wip-us.apache.org/repos/asf/flink/blob/dc99deb4/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala new file mode 100644 index 0000000..0462ffa --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.io.FileInputFormat +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.typeutils.TypeExtractionTest.CustomTypeInputFormat +import org.apache.flink.util.TestLogger +import org.junit.Assert.assertEquals +import org.junit.Test +import org.scalatest.junit.JUnitSuiteLike + + +class TypeExtractionTest extends TestLogger with JUnitSuiteLike { + + @Test + def testResultTypeQueryable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val productedType = env.createInput(new CustomTypeInputFormat).getType() + assertEquals(productedType, BasicTypeInfo.LONG_TYPE_INFO) + } + +} + +object TypeExtractionTest { + class CustomTypeInputFormat extends FileInputFormat[String] with ResultTypeQueryable[Long] { + + override def getProducedType: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + + override def reachedEnd(): Boolean = throw new UnsupportedOperationException() + + override def nextRecord(reuse: String): String = throw new UnsupportedOperationException() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc99deb4/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala index 5873630..b0e6098 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala @@ -34,7 +34,7 @@ import org.junit.Assert._ import org.junit.Test import org.scalatest.junit.JUnitSuiteLike -class TypeInfoFactoryTest extends TestLogger with JUnitSuiteLike { +class TypeInfoFactoryTest extends TestLogger with JUnitSuiteLike { @Test def testSimpleType(): Unit = {