有哪位大佬帮我看下,谢谢
尝试了很久,还是无法解析嵌套结构的Json
Error
Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. From line 4, column 9 to line 4, column 31: Column
'data.transaction_type' not found in any table
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
at
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
at
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
嵌套Json 定义的 format 和 schema 如下:
.withFormat(new Json()
.jsonSchema(
"""{type: 'object',
| properties: {
| database: {
| type: 'string'
| },
| table: {
| type: 'string'
| },
| maxwell_ts: {
| type: 'integer'
| },
| data: {
| type: 'object',
| properties :{
| reference_id :{
| type: 'string'
| },
| transaction_type :{
| type: 'integer'
| },
| merchant_id :{
| type: 'integer'
| },
| create_time :{
| type: 'integer'
| },
| status :{
| type: 'integer'
| }
| }
| }
| }
| }
""".stripMargin.replaceAll("\n", " ")
)
)
.withSchema(new Schema()
.field("table", STRING())
.field("database", STRING())
.field("data", ROW(FIELD("reference_id",STRING()),
FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
FIELD("status",INT())))
//.field("event_time", BIGINT())
// .from("maxwell_ts")
//.rowtime(new Rowtime()
// //.timestampsFromField("ts" * 1000)
// .timestampsFromField("ts")
// .watermarksPeriodicBounded(60000)
//)
)
bsTableEnv.sqlUpdate("""INSERT INTO yyyyy
| SELECT `table`, `database`
| `data.reference_id`,
| `data.transaction_type`,
| `data.merchant_id`,
| `data.create_time`,
| `data.status`
| FROM xxxx""".stripMargin)