[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5619


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189914232
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,143 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Float, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class ShortExplodeTableFunc extends TableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Short, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 class IntExplodeTableFunc extends TableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Int, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class LongExplodeTableFunc extends TableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Long, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class DoubleExplodeTableFunc extends TableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Double, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class ByteExplodeTableFunc extends TableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Byte, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
 class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
-arr.foreach(collect)
+CommonCollect.collectArray(arr, collect)
+  }
+
+  def eval(map: util.Map[Boolean, Integer]): Unit = {
+CommonCollect.collect(map, collect)
   }
 }
 
-object ExplodeFunctionUtil {
-  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
-ti match {
-  case pat: PrimitiveArrayTypeInfo[_] => {
-pat.getComponentType match {
-  case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
-  case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
-  case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
-  case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
-  case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
-  case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
-  case BasicTypeInfo.BOOLEAN_TYPE_INFO => new 
BooleanExplodeTableFunc
+object CommonCollect {
+
+  def collectArray[T](array: Array[T], collectFunc: (T) => Unit): Unit = {
--- End diff --

`collectFunc: (T) => Unit` is another Scala magic where we don't know how 
this is translated into byte code.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189167881
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
--- End diff --

Make sense. The JVM might not be smart enough to inline foreach().


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189159027
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
+  while (i < arr.length) {
+collect(arr(i))
+i += 1
+  }
+}
+  }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+if (null != map) {
--- End diff --

I revert to the last version that use generics param and convert Scala 
foreach to plain while loop.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189156884
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
--- End diff --

As Timo mentioned above "We should not use any Scala magic in runtime 
code." So I convert the foreach loops in all the methods of this class."



---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189144224
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
--- End diff --

Can't you just do the following?

` if (null != arr) {
 arr.foreach(collect)
}`


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189144701
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
+  while (i < arr.length) {
+collect(arr(i))
+i += 1
+  }
+}
+  }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+if (null != map) {
--- End diff --

Can we use generics here to remove the duplicate code below?


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189144263
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,244 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
+  var i = 0
+  while (i < arr.length) {
+collect(arr(i))
+i += 1
+  }
+}
+  }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+if (null != map) {
+  val it = map.entrySet().iterator()
+  while(it.hasNext) {
+val item = it.next()
+var i = 0
+while (i < item.getValue) {
+  collect(item.getKey)
+  i += 1
+}
+  }
+}
   }
 }
 
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
-arr.foreach(collect)
+if (null != arr) {
--- End diff --

same above and for the rest.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r189145367
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -367,6 +367,36 @@ class AggregateITCase(
 TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testTumbleWindowAggregateWithCollectUnnest(): Unit = {
--- End diff --

Can we just not use COLLECT to construct the test samples? In general, a 
unittest should test one thing a time, not multiple. And also for the streaming 
case.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188990005
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,129 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
+import scala.collection.JavaConverters._
+
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Float, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ShortExplodeTableFunc extends TableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Short, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 class IntExplodeTableFunc extends TableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Int, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class LongExplodeTableFunc extends TableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Long, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class DoubleExplodeTableFunc extends TableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Double, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ByteExplodeTableFunc extends TableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Byte, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Boolean, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
+}
+
+object CommonCollect {
+  def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): 
Unit = {
+map.asScala.foreach{ e =>
--- End diff --

Originally I want to reduce the similarly code since this util is written 
in scala. Of course, we should keep the runtime code's efficiency. I'll update 
it.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188988362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
 ---
@@ -104,7 +109,7 @@ class LogicalUnnestRule(
 ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, 
componentType)))
 case _: RelRecordType => componentType
 case _ => throw TableException(
-  s"Unsupported array component type in UNNEST: 
${componentType.toString}")
--- End diff --

Yes,  `multiset` shouldn't  appear.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188927749
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
 ---
@@ -104,7 +109,7 @@ class LogicalUnnestRule(
 ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, 
componentType)))
 case _: RelRecordType => componentType
 case _ => throw TableException(
-  s"Unsupported array component type in UNNEST: 
${componentType.toString}")
--- End diff --

Shouldn't this be just `component type` without `multiset`?


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188908643
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,129 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
+import scala.collection.JavaConverters._
+
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Float, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ShortExplodeTableFunc extends TableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Short, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 class IntExplodeTableFunc extends TableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Int, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class LongExplodeTableFunc extends TableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Long, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class DoubleExplodeTableFunc extends TableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Double, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ByteExplodeTableFunc extends TableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Byte, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Boolean, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
+}
+
+object CommonCollect {
+  def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): 
Unit = {
+map.asScala.foreach{ e =>
+  for (i <- 0 until e._2) {
+collectFunc(e._1)
+  }
+}
+  }
 }
 
 object ExplodeFunctionUtil {
-  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
+  def explodeTableFuncFromType(ti: TypeInformation[_]): TableFunction[_] = 
{
 ti match {
-  case pat: PrimitiveArrayTypeInfo[_] => {
-pat.getComponentType match {
-  case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
-  case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
-  case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
-  case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
-  case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
-  case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
-  case BasicTypeInfo.BOOLEAN_TYPE_INFO => new 
BooleanExplodeTableFunc
-}
-  }
+  case pat: PrimitiveArrayTypeInfo[_] => 
createTableFuncByType(pat.getComponentType)
+
   case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+
   case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
-  case _ => throw new UnsupportedOperationException(ti.toString + "IS 
NOT supported")
+
+  case mt: MultisetTypeInfo[_] => 
createTableFuncByType(mt.getElementTypeInfo)
+
+  case _ => throw new UnsupportedOperationException(ti.toString + " IS 
NOT supported")
+}
+  }
+
+  def createTableFuncByType(typeInfo: TypeInformation[_]): 
TableFunction[_] = {
+typeInfo match {
+  case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
+  case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
+  case BasicTypeIn

[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188910728
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -367,6 +367,36 @@ class AggregateITCase(
 TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testUnnestMultisetFromTumbleWindowAggregateCollectResult(): Unit = {
--- End diff --

Rename to `testTumbleWindowAggregateWithCollectUnnest`


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188906074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
@@ -18,74 +18,129 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.functions.TableFunction
 
+import scala.collection.JavaConverters._
+
 class ObjectExplodeTableFunc extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Float, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ShortExplodeTableFunc extends TableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Short, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 class IntExplodeTableFunc extends TableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Int, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class LongExplodeTableFunc extends TableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Long, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class DoubleExplodeTableFunc extends TableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Double, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class ByteExplodeTableFunc extends TableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Byte, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
 }
 
 class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
 arr.foreach(collect)
   }
+
+  def eval(map: util.Map[Boolean, Integer]): Unit = {
+CommonCollect.collect(map, collect)
+  }
+}
+
+object CommonCollect {
+  def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): 
Unit = {
+map.asScala.foreach{ e =>
--- End diff --

We should not use any Scala magic in runtime code. Can you convert it to a 
plain `while` loop? Would be great if you could also convert the `foreach` 
loops in the other methods of this class.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5619#discussion_r188931486
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -367,6 +367,36 @@ class AggregateITCase(
 TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testUnnestMultisetFromTumbleWindowAggregateCollectResult(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b) as `set`" +
+"FROM T " +
+"GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)"
+
+val ds = CollectionDataSets.get3TupleDataSet(env)
+  // create timestamps
+  .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
+tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+val view1 = tEnv.sqlQuery(sqlQuery)
+tEnv.registerTable("v1", view1)
+
+val sqlQuery1 = "SELECT b, s FROM v1 t1, unnest(t1.`set`) AS A(s) 
where b < 3"
--- End diff --

Remove the renaming of `v1` to `t1`.


---


[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-03-02 Thread lincoln-lil
GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/5619

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.

## What is the purpose of the change
*This PR add support for UNNEST a MultiSet type field according to SQL 
standard UNNEST a collection value
( ::=   | )

## Brief change log
- *Add support for UNNEST a MultiSet type field
## Verifying this change
- *See added unit tests.

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no

## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-8838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5619


commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7
Author: lincoln-lil 
Date:   2018-03-02T12:05:44Z

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.




---