??????????
??????????????
1??????1.9??????flinksql??????????kafka??????????????????????Csv 
format??????????????????????????????????????????Schema??????????????????????????????????json
 format??????????????????????

tableEnv.connect(


new Kafka()


.version("0.10")


.topic("topic1")


.property("bootstrap.servers", "node1:9092")


.property("group.id","test")


.startFromLatest()


)

.withFormat(


new Csv()


.schema(Types.ROW(??????????))        

#??????.schema(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.INT)) 
??????????Caused by: org.apache.flink.table.api.ValidationException: Could not map the schema field 'fruit' to a field from source. Please specify the source field from which it can be derived.??





.fieldDelimiter(',')


.lineDelimiter("\r\n")


)

.withSchema(


new Schema()


.field("rowtime",Types.SQL_TIMESTAMP)


.rowtime(new Rowtime()


.timestampsFromField("eventtime")


.watermarksPeriodicBounded(0)


)


.field("fruit", Types.STRING)


.field("number", Types.INT)


)

.inAppendMode()

.registerTableSource("sourceTable")




??????????????????????

回复