[ https://issues.apache.org/jira/browse/FLINK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser reassigned FLINK-24359: -------------------------------------- Assignee: Francesco Guardiani > Migrate FileSystem connector to ResolvedSchema > ---------------------------------------------- > > Key: FLINK-24359 > URL: https://issues.apache.org/jira/browse/FLINK-24359 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem > Environment: Flink 1.14-SNAPSHOT > Reporter: Francesco Guardiani > Assignee: Francesco Guardiani > Priority: Major > Labels: pull-request-available > > Filesystem connector uses the TableSchema deprecated APIs. This causes issues > with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) > requires the expressions to be serializable strings > (ResolvedExpression#asSerializableString). > For example: > {code:java} > TableDescriptor inputTable = TableDescriptor.forConnector("filesystem") > .schema( > Schema.newBuilder() > .column("character", DataTypes.STRING()) > .column("latitude", DataTypes.STRING()) > .column("longitude", DataTypes.STRING()) > .column("time", DataTypes.TIMESTAMP(3)) > .watermark("time", $("time").minus(lit(2).seconds())) > .build() > ) > // Other options > .build(); > {code} > When used in a table pipeline, throws the following exception: > {code:java} > Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, > 2000)' is not string serializable. Currently, only expressions that > originated from a SQL expression have a well-defined string representation. > at > org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) > at > org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) > at > java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976) > at > org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) > at > org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54) > at > org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52) > at > org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91) > at > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74) > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145) > {code} > The same table definition using SQL works fine: > {code:java} > CREATE TABLE IF NOT EXISTS LocationEvents ( > `character` STRING, > `latitude` STRING, > `longitude` STRING, > `time` TIMESTAMP(3), > WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES > ) WITH ( > -- Load from filesystem > 'connector' = 'filesystem', > --- Other configs > ); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)