各位好,我想使用kafka消息中的某个字段作为rowtime属性,遇到了以下问题,使用flink版本为1.9.1。
以下是我尝试的两种用法,都会报错。请问大家有没有遇到过类似的问题,怎么解决的,谢谢!
代码一:
tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-dept-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("dept_id", Types.INT)
.field("dept_name", Types.STRING)
.field("crt_time", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("crt_time")
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
)
.field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");
报错:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'crt_time' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
at
org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at
org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
at
org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
at com.sean.TimeWindowExample.main(TimeWindowExample.java:47)
代码二:
tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-dept-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("dept_id", Types.INT)
.field("dept_name", Types.STRING)
.field("crt_time", Types.SQL_TIMESTAMP)
.field("row_time", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("crt_time")
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
)
.field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");
报错:
Exception in thread "main" org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at com.sean.TimeWindowExample.main(TimeWindowExample.java:48)
Caused by: org.apache.flink.table.api.TableException: Field names must be
unique.
List of duplicate fields: [crt_time]
List of all fields: [dept_id, dept_name, crt_time, crt_time]
at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:94)
at org.apache.flink.table.api.TableSchema.<init>(TableSchema.java:49)
at
org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:352)
at
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema(TableFormatFactoryBase.java:163)
at
org.apache.flink.formats.json.JsonRowFormatFactory.createTypeInformation(JsonRowFormatFactory.java:88)
at
org.apache.flink.formats.json.JsonRowFormatFactory.createDeserializationSchema(JsonRowFormatFactory.java:62)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:263)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
at
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
... 3 more
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用