Hi, kk 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
祝好, Leonard Xu > 在 2020年5月26日,01:26,macia kk <[email protected]> 写道: > > 有哪位大佬帮我看下,谢谢 > > > 尝试了很久,还是无法解析嵌套结构的Json > > Error > > Caused by: org.apache.flink.table.api.ValidationException: SQL > validation failed. From line 4, column 9 to line 4, column 31: Column > 'data.transaction_type' not found in any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > > 嵌套Json 定义的 format 和 schema 如下: > > .withFormat(new Json() > .jsonSchema( > """{type: 'object', > | properties: { > | database: { > | type: 'string' > | }, > | table: { > | type: 'string' > | }, > | maxwell_ts: { > | type: 'integer' > | }, > | data: { > | type: 'object', > | properties :{ > | reference_id :{ > | type: 'string' > | }, > | transaction_type :{ > | type: 'integer' > | }, > | merchant_id :{ > | type: 'integer' > | }, > | create_time :{ > | type: 'integer' > | }, > | status :{ > | type: 'integer' > | } > | } > | } > | } > | } > """.stripMargin.replaceAll("\n", " ") > ) > ) > .withSchema(new Schema() > .field("table", STRING()) > .field("database", STRING()) > .field("data", ROW(FIELD("reference_id",STRING()), > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > FIELD("status",INT()))) > //.field("event_time", BIGINT()) > // .from("maxwell_ts") > //.rowtime(new Rowtime() > // //.timestampsFromField("ts" * 1000) > // .timestampsFromField("ts") > // .watermarksPeriodicBounded(60000) > //) > ) > > > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > | SELECT `table`, `database` > | `data.reference_id`, > | `data.transaction_type`, > | `data.merchant_id`, > | `data.create_time`, > | `data.status` > | FROM xxxx""".stripMargin)
