??????????????????????row????????????????????event time????????????????????????????????????
create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)), // ???????????? WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > ) with ( ... ) ???????????? ------------------ ???????? ------------------ ??????: "Benchao Li"<[email protected]>; ????????: 2020??5??26??(??????) ????9:55 ??????: "user-zh"<[email protected]>; ????: Re: Flink SQL ???? nested Json ???? Hi?? ????????????????????DDL??????source??format??????????????????????????DDL ?????????????????? create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int > ) with ( ... ) macia kk <[email protected]> ??2020??5??26?????? ????9:36?????? > Flink version: 1.10 > > Json: > > { > "database":"main_db", > "maxwell_ts":1590416550358000, > "table":"transaction_tab", > "data":{ > "transaction_sn":"8888", > "parent_id":0, > "user_id":333, > "amount":555, > "reference_id":"666", > "status":3, > "transaction_type":3, > "merchant_id":2, > "update_time":1590416550, > "create_time":1590416550 > }} > > > ????????????????????json???????? jsonSchema ??????Sechame > > > macia kk <[email protected]> ??2020??5??26?????? ????9:34?????? > > > Flink version: 1.10 > > > > Json: > > ```j > > { > > "database":"main_db", > > "maxwell_ts":1590416550358000, > > "table":"transaction_tab", > > "data":{ > > "transaction_sn":"8888", > > "parent_id":0, > > "user_id":333, > > "amount":555, > > "reference_id":"666", > > "status":3, > > "transaction_type":3, > > "merchant_id":2, > > "update_time":1590416550, > > "create_time":1590416550 > > } > > } > > ``` > > > > ????????????????????json???????? jsonSchema ??????Sechame > > > > > > > > Leonard Xu <[email protected]> ??2020??5??26?????? ????8:58?????? > > > >> Hi, kk > >> > >> ??????flink????????????1.10????????????format????????????????json?????????????????? > >> > >> > >> ?????? > >> Leonard Xu > >> > >> > >> > ?? 2020??5??26????01:26??macia kk <[email protected]> ?????? > >> > > >> > ???????????????????????? > >> > > >> > > >> > ??????????????????????????????????Json > >> > > >> > Error > >> > > >> > Caused by: org.apache.flink.table.api.ValidationException: SQL > >> > validation failed. From line 4, column 9 to line 4, column 31: Column > >> > 'data.transaction_type' not found in any table > >> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > >> > at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > >> > at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > >> > at > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > >> > at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > >> > at > >> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> > at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:498) > >> > at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > >> > > >> > > >> > ????Json ?????? format ?? schema ?????? > >> > > >> > .withFormat(new Json() > >> > .jsonSchema( > >> > """{type: 'object', > >> > | properties: { > >> > | database: { > >> > | type: 'string' > >> > | }, > >> > | table: { > >> > | type: 'string' > >> > | }, > >> > | maxwell_ts: { > >> > | type: 'integer' > >> > | }, > >> > | data: { > >> > | type: 'object', > >> > | properties :{ > >> > | reference_id :{ > >> > | type: 'string' > >> > | }, > >> > | transaction_type :{ > >> > | type: 'integer' > >> > | }, > >> > | merchant_id :{ > >> > | type: 'integer' > >> > | }, > >> > | create_time :{ > >> > | type: 'integer' > >> > | }, > >> > | status :{ > >> > | type: 'integer' > >> > | } > >> > | } > >> > | } > >> > | } > >> > | } > >> > """.stripMargin.replaceAll("\n", " ") > >> > ) > >> > ) > >> > .withSchema(new Schema() > >> > .field("table", STRING()) > >> > .field("database", STRING()) > >> > .field("data", ROW(FIELD("reference_id",STRING()), > >> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), > >> > FIELD("status",INT()))) > >> > //.field("event_time", BIGINT()) > >> > // .from("maxwell_ts") > >> > //.rowtime(new Rowtime() > >> > // //.timestampsFromField("ts" * 1000) > >> > // .timestampsFromField("ts") > >> > // .watermarksPeriodicBounded(60000) > >> > //) > >> > ) > >> > > >> > > >> > bsTableEnv.sqlUpdate("""INSERT INTO yyyyy > >> > | SELECT `table`, `database` > >> > | `data.reference_id`, > >> > | `data.transaction_type`, > >> > | `data.merchant_id`, > >> > | `data.create_time`, > >> > | `data.status` > >> > | FROM xxxx""".stripMargin) > >> > >> > -- Best, Benchao Li
