[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5619 ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189914232 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,143 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Object, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Float, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Short, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Int, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Long, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Double, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Byte, Integer]): Unit = { +CommonCollect.collect(map, collect) } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { -arr.foreach(collect) +CommonCollect.collectArray(arr, collect) + } + + def eval(map: util.Map[Boolean, Integer]): Unit = { +CommonCollect.collect(map, collect) } } -object ExplodeFunctionUtil { - def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = { -ti match { - case pat: PrimitiveArrayTypeInfo[_] => { -pat.getComponentType match { - case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc - case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc - case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc - case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc - case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc - case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc - case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc +object CommonCollect { + + def collectArray[T](array: Array[T], collectFunc: (T) => Unit): Unit = { --- End diff -- `collectFunc: (T) => Unit` is another Scala magic where we don't know how this is translated into byte code. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189167881 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 --- End diff -- Make sense. The JVM might not be smart enough to inline foreach(). ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189159027 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 + while (i < arr.length) { +collect(arr(i)) +i += 1 + } +} + } + + def eval(map: util.Map[Object, Integer]): Unit = { +if (null != map) { --- End diff -- I revert to the last version that use generics param and convert Scala foreach to plain while loop. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189156884 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 --- End diff -- As Timo mentioned above "We should not use any Scala magic in runtime code." So I convert the foreach loops in all the methods of this class." ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189144224 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 --- End diff -- Can't you just do the following? ` if (null != arr) { arr.foreach(collect) }` ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189144701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 + while (i < arr.length) { +collect(arr(i)) +i += 1 + } +} + } + + def eval(map: util.Map[Object, Integer]): Unit = { +if (null != map) { --- End diff -- Can we use generics here to remove the duplicate code below? ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189144263 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,244 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { -arr.foreach(collect) +if (null != arr) { + var i = 0 + while (i < arr.length) { +collect(arr(i)) +i += 1 + } +} + } + + def eval(map: util.Map[Object, Integer]): Unit = { +if (null != map) { + val it = map.entrySet().iterator() + while(it.hasNext) { +val item = it.next() +var i = 0 +while (i < item.getValue) { + collect(item.getKey) + i += 1 +} + } +} } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { -arr.foreach(collect) +if (null != arr) { --- End diff -- same above and for the rest. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r189145367 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala --- @@ -367,6 +367,36 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + @Test + def testTumbleWindowAggregateWithCollectUnnest(): Unit = { --- End diff -- Can we just not use COLLECT to construct the test samples? In general, a unittest should test one thing a time, not multiple. And also for the streaming case. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188990005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,129 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction +import scala.collection.JavaConverters._ + class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Object, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Float, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Short, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Int, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Long, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Double, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Byte, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Boolean, Integer]): Unit = { +CommonCollect.collect(map, collect) + } +} + +object CommonCollect { + def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = { +map.asScala.foreach{ e => --- End diff -- Originally I want to reduce the similarly code since this util is written in scala. Of course, we should keep the runtime code's efficiency. I'll update it. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188988362 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala --- @@ -104,7 +109,7 @@ class LogicalUnnestRule( ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType))) case _: RelRecordType => componentType case _ => throw TableException( - s"Unsupported array component type in UNNEST: ${componentType.toString}") --- End diff -- Yes, `multiset` shouldn't appear. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188927749 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala --- @@ -104,7 +109,7 @@ class LogicalUnnestRule( ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType))) case _: RelRecordType => componentType case _ => throw TableException( - s"Unsupported array component type in UNNEST: ${componentType.toString}") --- End diff -- Shouldn't this be just `component type` without `multiset`? ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188908643 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,129 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction +import scala.collection.JavaConverters._ + class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Object, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Float, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Short, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Int, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Long, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Double, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Byte, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Boolean, Integer]): Unit = { +CommonCollect.collect(map, collect) + } +} + +object CommonCollect { + def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = { +map.asScala.foreach{ e => + for (i <- 0 until e._2) { +collectFunc(e._1) + } +} + } } object ExplodeFunctionUtil { - def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = { + def explodeTableFuncFromType(ti: TypeInformation[_]): TableFunction[_] = { ti match { - case pat: PrimitiveArrayTypeInfo[_] => { -pat.getComponentType match { - case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc - case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc - case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc - case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc - case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc - case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc - case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc -} - } + case pat: PrimitiveArrayTypeInfo[_] => createTableFuncByType(pat.getComponentType) + case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc - case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported") + + case mt: MultisetTypeInfo[_] => createTableFuncByType(mt.getElementTypeInfo) + + case _ => throw new UnsupportedOperationException(ti.toString + " IS NOT supported") +} + } + + def createTableFuncByType(typeInfo: TypeInformation[_]): TableFunction[_] = { +typeInfo match { + case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc + case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc + case BasicTypeIn
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188910728 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala --- @@ -367,6 +367,36 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + @Test + def testUnnestMultisetFromTumbleWindowAggregateCollectResult(): Unit = { --- End diff -- Rename to `testTumbleWindowAggregateWithCollectUnnest` ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188906074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,129 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction +import scala.collection.JavaConverters._ + class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Object, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Float, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Short, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Int, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Long, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Double, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Byte, Integer]): Unit = { +CommonCollect.collect(map, collect) + } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Boolean, Integer]): Unit = { +CommonCollect.collect(map, collect) + } +} + +object CommonCollect { + def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = { +map.asScala.foreach{ e => --- End diff -- We should not use any Scala magic in runtime code. Can you convert it to a plain `while` loop? Would be great if you could also convert the `foreach` loops in the other methods of this class. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188931486 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala --- @@ -367,6 +367,36 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + @Test + def testUnnestMultisetFromTumbleWindowAggregateCollectResult(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val sqlQuery = + "SELECT b, COLLECT(b) as `set`" + +"FROM T " + +"GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)" + +val ds = CollectionDataSets.get3TupleDataSet(env) + // create timestamps + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) +tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + +val view1 = tEnv.sqlQuery(sqlQuery) +tEnv.registerTable("v1", view1) + +val sqlQuery1 = "SELECT b, s FROM v1 t1, unnest(t1.`set`) AS A(s) where b < 3" --- End diff -- Remove the renaming of `v1` to `t1`. ---
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/5619 [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. ## What is the purpose of the change *This PR add support for UNNEST a MultiSet type field according to SQL standard UNNEST a collection value ( ::= | ) ## Brief change log - *Add support for UNNEST a MultiSet type field ## Verifying this change - *See added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with @Public(Evolving): no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-8838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5619.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5619 commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7 Author: lincoln-lil Date: 2018-03-02T12:05:44Z [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. ---