lindong28 commented on code in PR #188: URL: https://github.com/apache/flink-ml/pull/188#discussion_r1061111236
########## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/TableUtils.java: ########## @@ -26,18 +26,20 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.types.Row; /** Utility class for operations related to Table API. */ public class TableUtils { - // Constructs a RowTypeInfo from the given schema. + // Constructs a RowTypeInfo from the given schema. Currently, this function does not support + // the case when the input contains DataTypes.TIMESTAMP_WITH_TIME_ZONE(). public static RowTypeInfo getRowTypeInfo(ResolvedSchema schema) { TypeInformation<?>[] types = new TypeInformation<?>[schema.getColumnCount()]; String[] names = new String[schema.getColumnCount()]; for (int i = 0; i < schema.getColumnCount(); i++) { Column column = schema.getColumn(i).get(); - types[i] = TypeInformation.of(column.getDataType().getConversionClass()); + types[i] = ExternalTypeInfo.of(column.getDataType()); Review Comment: If the purpose of this change is to preserve the TIMESTAMP precision for fields with TIMESTAMP type, can we use `ExternalTypeInfo.of` only for fields with TIMESTAMP type? It will be nice if the other fields can still use `TypeInformation.of` without incurring unnecessary serialization overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org