[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5015


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
--- End diff --

You are right, since both ARRAY and MAP require explicit type match. 
This actually prompts the question: "should we support implicit type case 
like Calcite did?


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151851711
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -705,6 +705,14 @@ trait ImplicitExpressionOperations {
 */
   def element() = ArrayElement(expr)
 
+  /**
+* Accesses the element of a map based on key.
+*
+* @param key key of the element
+* @return value of the element
+*/
+  def getValue(key: Expression) = MapElementGetValue(expr, key)
--- End diff --

I think that makes perfect sense since both of them utilize ITEM operator 


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151401984
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -705,6 +705,14 @@ trait ImplicitExpressionOperations {
 */
   def element() = ArrayElement(expr)
 
+  /**
+* Accesses the element of a map based on key.
+*
+* @param key key of the element
+* @return value of the element
+*/
+  def getValue(key: Expression) = MapElementGetValue(expr, key)
--- End diff --

Can we use `at()` instead? I think this would make the API more consistent.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151408469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 ---
@@ -333,6 +337,10 @@ class BasicOperatorTable extends 
ReflectiveSqlOperatorTable {
 SqlStdOperatorTable.ITEM,
 SqlStdOperatorTable.CARDINALITY,
 SqlStdOperatorTable.ELEMENT,
+// MAP OPERATORS
+SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR,
+SqlStdOperatorTable.ITEM,
--- End diff --

ITEM and CARDINALITY are already present.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151409600
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
 ---
@@ -156,6 +156,8 @@ class SqlExpressionTest extends ExpressionTestBase {
 // testSqlApi("('hello world', 12)", "hello world") // test base only 
returns field 0
 testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
 testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
+testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2")
--- End diff --

Please also update the documentation for both Table API and SQL. They need 
to be kept consistent.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151408733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 ---
@@ -333,6 +337,10 @@ class BasicOperatorTable extends 
ReflectiveSqlOperatorTable {
 SqlStdOperatorTable.ITEM,
 SqlStdOperatorTable.CARDINALITY,
 SqlStdOperatorTable.ELEMENT,
+// MAP OPERATORS
+SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR,
+SqlStdOperatorTable.ITEM,
--- End diff --

Btw there is no code that implements CARDINALITY. Can you add this call?


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151407692
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
+  .MAP_VALUE_CONSTRUCTOR
+  .inferReturnType(typeFactory, entryRelDataTypes.toList.asJava)
+val values = elements.map(_.toRexNode).toList.asJava
+relBuilder
+  .getRexBuilder
+  .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, 
values)
+  }
+
+  override def toString = s"map(${elements
+.grouped(2)
+.map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new 
MapTypeInfo(
+elements.head.resultType,
+elements.last.resultType
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (elements.isEmpty) {
+  return ValidationFailure("Empty maps are not supported yet.")
+}
+if (elements.size % 2 != 0) {
+  return ValidationFailure("maps must have even number of elements to 
form key value pairs.")
--- End diff --

`Maps...`


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151408024
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
+  .MAP_VALUE_CONSTRUCTOR
+  .inferReturnType(typeFactory, entryRelDataTypes.toList.asJava)
+val values = elements.map(_.toRexNode).toList.asJava
+relBuilder
+  .getRexBuilder
+  .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, 
values)
+  }
+
+  override def toString = s"map(${elements
+.grouped(2)
+.map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new 
MapTypeInfo(
+elements.head.resultType,
+elements.last.resultType
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (elements.isEmpty) {
+  return ValidationFailure("Empty maps are not supported yet.")
+}
+if (elements.size % 2 != 0) {
+  return ValidationFailure("maps must have even number of elements to 
form key value pairs.")
+}
+if (!elements.grouped(2).forall(_.head.resultType == 
elements.head.resultType)) {
+  return ValidationFailure("Not all key elements of the map literal 
have the same type.")
+}
+if (!elements.grouped(2).forall(_.last.resultType == 
elements.last.resultType)) {
+  return ValidationFailure("Not all value elements of the map literal 
have the same type.")
+}
+ValidationSuccess
+  }
+}
+
+case class MapElementGetValue(map: Expression, key: Expression) extends 
Expression {
--- End diff --

`SqlStdOperatorTable.ITEM` is responsible for both arrays and maps. Maybe 
we can also merge the two classes here.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151406004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -141,6 +142,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val dataType: PackratParser[TypeInformation[_]] =
 PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => 
Types.PRIMITIVE_ARRAY(ct) } |
 OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => 
Types.OBJECT_ARRAY(ct) } |
+MAP ~ "(" ~> dataType ~ dataType <~ ")" ^^ { tt => Types.MAP(tt._1, 
tt._2)} |
--- End diff --

Comma is missing.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151406114
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -141,6 +142,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val dataType: PackratParser[TypeInformation[_]] =
 PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => 
Types.PRIMITIVE_ARRAY(ct) } |
 OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => 
Types.OBJECT_ARRAY(ct) } |
+MAP ~ "(" ~> dataType ~ dataType <~ ")" ^^ { tt => Types.MAP(tt._1, 
tt._2)} |
--- End diff --

Maybe you can test an identity cast just to test the functionality.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5015#discussion_r151407535
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = 
relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+val entryRelDataTypes = elements
+  .map(x => typeFactory.createTypeFromTypeInfo(x.resultType, 
isNullable = false))
+val relDataType = SqlStdOperatorTable
--- End diff --

You can replace this by a call to the `FlinkTypeFactory.createMapType()`.


---


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-14 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5015

[FLINK-8038][Table API] Support MAP value constructor

## What is the purpose of the change

This pull request makes creates Map value constructor support for Table and 
SQL API. 
This is to enable creating Map literals or fields combination, such as:
```
MAP('a', '1', 'b', f4, 'c', intField.cast(STRING)) // Table API
MAP['a', '1', 'b', stringField. 'c', CAST(intField AS VARCHAR(65536)) // 
SQL API
```
It also supports accessing a particular value within a MAP object, such as:
```
MAP('foo', 'bar').getValue('foo') // Table API
MAP['foo', 'bar']['foo'] // SQL API, field access is already supported in 
FLINK-6377 
```

## Brief change log

Changes includes:
  - Created map case class in flink table expression to support map 
generation and get value operation
  - Created `getValue` and `map` in ExpressionDsl
  - added `generateMap` in CodeGenerator
  - define `generateOperator` and `generateMap` impl in ScalarOperators
  - added in expression parsing logic to feed type information in 
ExpressionParser

## Verifying this change

Added in various Map operator tests in MapTypeTest and SqlExpressionTest.

## Does this pull request potentially affect one of the following parts:

Not that I know of.

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? (not documented, please advise)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8038

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5015.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5015


commit 64f37583b9fda9ecd691a7245be250f4f7531c04
Author: Rong Rong 
Date:   2017-11-14T18:48:16Z

initial support for Map literals, there are several literal operations 
not supported, such as
MAP('a', 1, 'b', 2).getValue('a') is supported but .get('a') is not 
supported as MapTypeInfo is not a compositeType
MAP('a', 1, 'b', 2).cardinality() is not supported as cardinality now 
is only supported by ObjectArrayTypeInfo
MAP('a', 1, 'b', 2).keySet/valueSet is not supported yet
implicity Type casting is not available yet as it has not been 
supported on ObjectArrayType either




---