[FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats.

This closes #2619


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f726980
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f726980
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f726980

Branch: refs/heads/release-1.1
Commit: 9f7269808f3694815bba1e4dbf050db2a2dfe15f
Parents: 9591d50
Author: twalthr <twal...@apache.org>
Authored: Tue Oct 11 11:19:32 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Sat Oct 15 08:04:55 2016 +0200

----------------------------------------------------------------------
 .../flink/api/scala/ExecutionEnvironment.scala  |  6 +--
 .../org/apache/flink/api/scala/package.scala    | 11 +++-
 .../scala/typeutils/TypeExtractionTest.scala    | 53 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index f03cb84..4f9d569 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -110,7 +110,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     */
   @PublicEvolving
   def getRestartStrategy: RestartStrategyConfiguration = {
-    javaEnv.getRestartStrategy()
+    javaEnv.getRestartStrategy
   }
 
   /**
@@ -381,7 +381,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     require(inputFormat != null, "InputFormat must not be null.")
     require(filePath != null, "File path must not be null.")
     inputFormat.setFilePath(new Path(filePath))
-    createInput(inputFormat, implicitly[TypeInformation[T]])
+    createInput(inputFormat, explicitFirst(inputFormat, 
implicitly[TypeInformation[T]]))
   }
 
   /**
@@ -392,7 +392,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     if (inputFormat == null) {
       throw new IllegalArgumentException("InputFormat must not be null.")
     }
-    createInput(inputFormat, implicitly[TypeInformation[T]])
+    createInput(inputFormat, explicitFirst(inputFormat, 
implicitly[TypeInformation[T]]))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index e5ca465..6096388 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -21,8 +21,9 @@ package org.apache.flink.api
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo, TypeUtils, ScalaNothingTypeInfo}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo, ScalaNothingTypeInfo, TypeUtils}
 
 import _root_.scala.reflect.ClassTag
 import language.experimental.macros
@@ -52,6 +53,14 @@ package object scala {
   // We need to wrap Java DataSet because we need the scala operations
   private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new 
DataSet[R](set)
 
+  // Checks if object has explicit type information using ResultTypeQueryable
+  private[flink] def explicitFirst[T](
+      funcOrInputFormat: AnyRef,
+      typeInfo: TypeInformation[T]): TypeInformation[T] = funcOrInputFormat 
match {
+    case rtq: ResultTypeQueryable[T] => rtq.getProducedType
+    case _ => typeInfo
+  }
+
   private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],
       fields: Array[String]): Array[Int] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
new file mode 100644
index 0000000..0462ffa
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.io.FileInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.typeutils.TypeExtractionTest.CustomTypeInputFormat
+import org.apache.flink.util.TestLogger
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+
+class TypeExtractionTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testResultTypeQueryable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val productedType = env.createInput(new CustomTypeInputFormat).getType()
+    assertEquals(productedType, BasicTypeInfo.LONG_TYPE_INFO)
+  }
+
+}
+
+object TypeExtractionTest {
+  class CustomTypeInputFormat extends FileInputFormat[String] with 
ResultTypeQueryable[Long] {
+
+    override def getProducedType: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+
+    override def reachedEnd(): Boolean = throw new 
UnsupportedOperationException()
+
+    override def nextRecord(reuse: String): String = throw new 
UnsupportedOperationException()
+  }
+}

Reply via email to