??????????
??????????????
1??????1.9??????flinksql??????????kafka??????????????????????Csv
format??????????????????????????????????????????Schema??????????????????????????????????json
format??????????????????????
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("topic1")
.property("bootstrap.servers", "node1:9092")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Csv()
.schema(Types.ROW(??????????))
#??????.schema(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.INT))
??????????Caused by: org.apache.flink.table.api.ValidationException: Could not map the schema field 'fruit' to a field from source. Please specify the source field from which it can be derived.??
.fieldDelimiter(',')
.lineDelimiter("\r\n")
)
.withSchema(
new Schema()
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(0)
)
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable")
??????????????????????