代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入: {"eventTime": 100000, "id":1,"name":"hb"} 会报错,
输入 {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.
错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON
object.
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '100000' could not be
parsed at index 0
at
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```
源码:
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
val conf =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, conf)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")
val json = new Json().deriveSchema()
val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())
schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
val t = tEnv.sqlQuery("select * from table_from_kafka")
t.printSchema()
t.toRetractStream[Row].print()
tEnv.execute("")
```