[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-10 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r291923298
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -147,12 +171,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's [[InternalType]]
+* @param fieldTypes field types, every element is Flink's [[LogicalType]]
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
 
 Review comment:
   @KurtYoung I have done some refactor to this in d5f0cb4 , what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r291888066
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
 ##
 @@ -55,4 +57,8 @@ class GenericRelDataType(
   override def hashCode(): Int = {
 genericType.hashCode()
   }
+
+  override def generateTypeString(sb: lang.StringBuilder, withDetail: 
Boolean): Unit = {
 
 Review comment:
   Now we delete `ArrayRelDataType` and use `ArraySqlType` and etc...
   NOTE: `equals` of `ArraySqlType` is:
   ```
   @Override public boolean equals(Object obj) {
   if (obj instanceof RelDataTypeImpl) {
 final RelDataTypeImpl that = (RelDataTypeImpl) obj;
 return this.digest.equals(that.digest);
   }
   return false;
 }
   ```
   So we must need override `generateTypeString` all calcite extended type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r291015856
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
 ##
 @@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
+import org.apache.flink.table.typeutils.BinaryArraySerializer;
+import org.apache.flink.table.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.typeutils.BinaryMapSerializer;
+import org.apache.flink.table.typeutils.BinaryStringSerializer;
+import org.apache.flink.table.typeutils.DecimalSerializer;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
+
+/**
+ * Utilities for {@link LogicalType} and {@link DataType}..
+ */
+public class PlannerTypeUtils {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r291015847
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeConversions.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BigDecimalTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
+import org.apache.flink.table.typeutils.DecimalTypeInfo;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Converters for {@link LogicalType} and {@link DataType}.
+ */
+public class PlannerTypeConversions {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290641194
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+  case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
 
 Review comment:
   I'll keep precision.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290611122
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   OK, But we should let `VARCHAR(A size)` and `VARCHAR(B size)` interoperable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290611122
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   OK, But we need modify `ExprCodeGenerator.generateResultExpression` to 
support assign `VARCHAR(A size)` to `VARCHAR(B size)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290596646
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -147,12 +171,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's [[InternalType]]
+* @param fieldTypes field types, every element is Flink's [[LogicalType]]
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
 
 Review comment:
   `buildLogicalRowType` is for sql row from a `table`.
   `createTypeFromLogicalType` just is for field types, it is a nested row when 
type is `RowType`.
   I think the biggest difference should be the parameters.
   Do you have any good idea to distinguish them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590844
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   >I remember you want disable all CHAR support
   
   I have removed Java CHAR support, not sql ChAR.
   Sql CHAR will be used in some cases like literal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290591671
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 
 Review comment:
   I'll remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590844
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   >I remember you want disable all CHAR support
   
   I have remove Java CHAR support, not sql ChAR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590844
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   >I remember you want disable all CHAR support
   
   I have removed Java CHAR support, not sql ChAR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590636
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -164,19 +188,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's 
[[InternalType]]
+* @param fieldTypes field types, every element is Flink's 
[[LogicalType]]
 * @param fieldNullables field nullable properties
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
   fieldNames: Seq[String],
-  fieldTypes: Seq[InternalType],
+  fieldTypes: Seq[LogicalType],
   fieldNullables: Seq[Boolean]): RelDataType = {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590585
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -147,12 +171,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's [[InternalType]]
+* @param fieldTypes field types, every element is Flink's [[LogicalType]]
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
 
 Review comment:
   The `buildLogicalRowType` return row type with `StructKind.FULLY_QUALIFIED`.
   The `createTypeFromLogicalType` return row type with 
`StructKind.PEEK_FIELDS_NO_EXPAND`.
   Them has different semantics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290590321
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
 ##
 @@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.flink.table.`type`.ArrayType
+import org.apache.flink.table.types.logical.ArrayType
 
 Review comment:
   NOTE: we should use `RelRecordType` with `StructKind.PEEK_FIELDS_NO_EXPAND` 
to replace `RowRelDataType`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-05 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290589983
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
 ##
 @@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.flink.table.`type`.ArrayType
+import org.apache.flink.table.types.logical.ArrayType
 
 Review comment:
   Nice catch, Yes, Previously, the main purpose was to distinguish 
`BasicArrayTypeInfo`, `ObjectArrayTypeInfo`, `PojoTypeInfo`, `RowTypeInfo` and 
etc... Now the `LogicalType` are sql type. so I think we don't need 
`Array/Map/Multiset RelDataType`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290584736
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 
 Review comment:
   The original intention here is to convert all time indicators to regular 
`TimestampType` when it is batch mode.
   You mean this conversion should be outside?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290584058
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 buildRelDataType(
   tableSchema.getFieldNames.toSeq,
-  tableSchema.getFieldTypes map {
-case _: TimeIndicatorTypeInfo if isStreaming.isDefined && 
!isStreaming.get =>
-  InternalTypes.TIMESTAMP
-case tpe: TypeInformation[_] => createInternalTypeFromTypeInfo(tpe)
+  tableSchema.getFieldDataTypes.map(_.getLogicalType).map {
+case t: TimestampType if isStreaming.isDefined && !isStreaming.get =>
+  if (t.getKind == TimestampKind.REGULAR) t else new TimestampType(3)
+case t => t
   })
   }
 
   def buildRelDataType(
 
 Review comment:
   I'll change name to `buildLogicalRowType`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290582852
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+  case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
+  case DECIMAL => new DecimalType(relDataType.getPrecision, 
relDataType.getScale)
+
+  // time indicators
+  case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+if (indicator.isEventTime) {
+  new TimestampType(true, TimestampKind.ROWTIME, 3)
+} else {
+  new TimestampType(true, TimestampKind.PROCTIME, 3)
+}
 
-case _@t =>
-  throw new TableException(s"Type is not supported: $t")
+  // temporal types
+  case DATE => new DateType()
+  case TIME => new TimeType()
+  case TIMESTAMP => new TimestampType(3)
+  case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
+  case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
+
+  case NULL =>
+throw new TableException(
+  "Type NULL is not supported. Null values must have a supported 
type.")
+
+  // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+  // are represented as Enum
+  case SYMBOL => 
toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]]))
+
+  // extract encapsulated Type
+  case ANY if 

[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290579108
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
 ##
 @@ -47,7 +44,7 @@
private UnresolvedReferenceExpression sum = new 
UnresolvedReferenceExpression("sum");
private UnresolvedReferenceExpression count = new 
UnresolvedReferenceExpression("count");
 
-   public abstract TypeInformation getSumType();
+   public abstract DataType getSumType();
 
 Review comment:
   Look at @twalthr 's response.
   
   >The user should not be confronted to two different type stacks. That's why 
the description in DataType calls it "hints", a user should always use 
DataType. If he adds a hint where it is not appropriate, we just don't need to 
use those hints.
   
   In @wuchong 's first thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290577508
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -41,87 +40,112 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
-  * Flink specific type factory that represents the interface between Flink's 
[[InternalType]]
+  * Flink specific type factory that represents the interface between Flink's 
[[LogicalType]]
   * and Calcite's [[RelDataType]].
   */
 class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImpl(typeSystem) {
 
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[(InternalType, Boolean), 
RelDataType]()
+  private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
 
-  def createTypeFromInternalType(
-  tp: InternalType,
-  isNullable: Boolean): RelDataType = {
+  def createTypeFromLogicalType(t: LogicalType, isNullable: Boolean): 
RelDataType = {
 
 Review comment:
   OK, I'll replace it with `createTypeFromLogicalType(t.copy(nullable))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-15 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r284531614
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
 ##
 @@ -98,30 +108,30 @@ protected Expression orderKeyEqualsExpression() {
return ret.orElseGet(() -> literal(true));
}
 
-   protected Expression generateInitLiteral(InternalType orderType) {
-   if (orderType.equals(InternalTypes.BOOLEAN)) {
+   protected Expression generateInitLiteral(LogicalType orderType) {
+   if (orderType instanceof BooleanType) {
 
 Review comment:
   Glad to see new visitors in FLINK-12254.
   I looked at the code again and found that use type to check directly is not 
so strong.
   The below code is not bad:
   ```
   t.getTypeRoot match {
 case BOOLEAN => s"$rowTerm.getBoolean($indexTerm)"
 case TINYINT => s"$rowTerm.getByte($indexTerm)"
 case SMALLINT => s"$rowTerm.getShort($indexTerm)"
 ..
 case ROW => s"$rowTerm.getRow($indexTerm, 
${t.asInstanceOf[RowType].getFieldCount})"
   }
   ```
   So I change all LogicalType instance match to the logical type root match.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283691935
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LongHashJoinGenerator.scala
 ##
 @@ -44,11 +44,12 @@ object LongHashJoinGenerator {
 joinType == HashJoinType.ANTI ||
 joinType == HashJoinType.PROBE_OUTER) &&
 filterNulls.forall(b => b) &&
-keyType.getFieldTypes.length == 1 && {
-  val t = keyType.getTypeAt(0)
-  t == InternalTypes.LONG || t == InternalTypes.INT || t == 
InternalTypes.SHORT ||
-  t == InternalTypes.BYTE || t == InternalTypes.FLOAT || t == 
InternalTypes.DOUBLE ||
-  t.isInstanceOf[DateType] || t.isInstanceOf[TimestampType] || t == 
InternalTypes.TIME
+keyType.getFieldCount == 1 && {
+  keyType.getTypeAt(0) match {
+case _: BigIntType | _: IntType | _: SmallIntType | _: TinyIntType |
 
 Review comment:
   Decimal is not supported here, so type families cannot be used. (Just 
because we haven't supported it yet)
   I think I can use type families in `TypeCheckUtils`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283689574
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
 ##
 @@ -68,6 +68,10 @@ public TypeInformationAnyType(TypeInformation typeInfo) {
this(true, typeInfo);
}
 
+   public TypeInformationAnyType(Class clazz) {
 
 Review comment:
   Good catch, I'll take it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283689154
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
 ##
 @@ -253,4 +257,12 @@ private static void validateFields(List fields) 
{
String.format("Field names must be unique. 
Found duplicates: %s", duplicates));
}
}
+
+   public static RowType of(LogicalType... types) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283688763
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -602,7 +602,7 @@ class RexTimeIndicatorMaterializerUtils(rexBuilder: 
RexBuilder) {
   private def timestamp(isNullable: Boolean): RelDataType = rexBuilder
 .getTypeFactory
 .asInstanceOf[FlinkTypeFactory]
-.createTypeFromInternalType(InternalTypes.TIMESTAMP, isNullable = 
isNullable)
+.createTypeFromLogicalType(new TimestampType(), isNullable = isNullable)
 
 Review comment:
   I think this PR is big enough. My idea is to override the functionality of 
`InternalType` first, and then to extend the functionality(like nullable) of 
`LogicalType` in the next PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283687864
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
 ##
 @@ -98,30 +108,30 @@ protected Expression orderKeyEqualsExpression() {
return ret.orElseGet(() -> literal(true));
}
 
-   protected Expression generateInitLiteral(InternalType orderType) {
-   if (orderType.equals(InternalTypes.BOOLEAN)) {
+   protected Expression generateInitLiteral(LogicalType orderType) {
+   if (orderType instanceof BooleanType) {
 
 Review comment:
   For simple codes or codes in Java, I prefer use the logical type root.
   But for complex type codes in Scala, I prefer use type to check directly. 
Just like:
   ```
   t match {
 case rowType: RowType => // use rowType do some thing
 case arrayType: ArrayType => // use arrayType do some thing
   }
   ```
   I think it is better than:
   ```
   t.getTypeRoot match {
 case LogicalTypeRoot.ROW => t.asInstanceOf[RowType] // use rowType do some 
thing
 case LogicalTypeRoot.ARRAY => t.asInstanceOf[ArrayType] // use rowType do 
some thing
   }
   ```
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283684788
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
 ##
 @@ -152,12 +152,14 @@ public boolean equals(Object o) {
if (!super.equals(o)) {
return false;
}
+
TimestampType that = (TimestampType) o;
-   return precision == that.precision;
+
+   return precision == that.precision && kind == that.kind;
 
 Review comment:
   Yes, `TimeIndicatorTypeInfo` and `SqlTimeTypeInfo.TIMESTAMP` also have same 
behavior.
   But why are the kind does not affect equality and hashCode?
   Is it for `Timestamp(6) of ROWTIME` and `Timestamp(6) of REGULAR` to 
interoperate?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283661779
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
 ##
 @@ -98,30 +108,30 @@ protected Expression orderKeyEqualsExpression() {
return ret.orElseGet(() -> literal(true));
}
 
-   protected Expression generateInitLiteral(InternalType orderType) {
-   if (orderType.equals(InternalTypes.BOOLEAN)) {
+   protected Expression generateInitLiteral(LogicalType orderType) {
+   if (orderType instanceof BooleanType) {
 
 Review comment:
   Yeah, and Java can use `switch case`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283661498
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/logical/LogicalTypeUtils.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+/**
+ * Utilities for {@link LogicalType}.
+ */
+public class LogicalTypeUtils {
+
+   /**
+* Gets the arity of the type.
+*/
+   public static int getArity(LogicalType t) {
+   if (t instanceof RowType) {
+   return ((RowType) t).getFieldCount();
+   } else {
+   return 1;
+   }
+   }
+
+   public static Class getExternalClassForType(LogicalType type) {
+   return 
LogicalTypeConverters.createExternalTypeInfoFromLogicalType(type).getTypeClass();
+   }
+
+   public static Class getInternalClassForType(LogicalType type) {
+   return 
LogicalTypeConverters.createInternalTypeInfoFromLogicalType(type).getTypeClass();
+   }
+
+   public static RowType toRowType(LogicalType t) {
+   if (t instanceof RowType) {
+   return (RowType) t;
+   } else {
+   return RowType.of(t);
+   }
+   }
+
+   public static boolean isPrimitive(LogicalType type) {
+   return type instanceof BooleanType || type instanceof 
TinyIntType || type instanceof SmallIntType ||
+   type instanceof IntType || type instanceof 
BigIntType || type instanceof FloatType ||
+   type instanceof DoubleType;
+   }
+
+   public static boolean isAtomic(LogicalType type) {
+   return isPrimitive(type) || type instanceof DecimalType || type 
instanceof VarCharType ||
+   type instanceof TimeType || type instanceof 
TimestampType || type instanceof DateType ||
+   type instanceof BinaryType || type instanceof 
YearMonthIntervalType || type instanceof DayTimeIntervalType;
 
 Review comment:
   I'll remove this useless method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283653711
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
 ##
 @@ -62,7 +62,7 @@
/**
 * All types of the aggregate buffer.
 */
-   public abstract InternalType[] getAggBufferTypes();
+   public abstract LogicalType[] getAggBufferTypes();
 
 Review comment:
   Yes, I got it, it is better to let user just know one single type stack.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-14 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283645950
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
 ##
 @@ -62,7 +62,7 @@
/**
 * All types of the aggregate buffer.
 */
-   public abstract InternalType[] getAggBufferTypes();
+   public abstract LogicalType[] getAggBufferTypes();
 
 Review comment:
   I have different thought.
   The `DeclarativeAggregateFunction` does not expose physical representation. 
It is the interface to description logic computing for user.
   
   >The DataType class has two responsibilities: declaring a logical type and 
giving hints about the physical representation of data to the optimizer. While 
the logical type is mandatory, hints are optional but useful at the edges to 
other APIs.
   
   So I think we should use `LogicalType` for `getAggBufferTypes` and 
`getResultType`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services