??????tableEnv.sqlUpdate(ddl);??????????
????????????????????????????????????rowtime????????????????????????rowtime?????????????????????? ??????flink??????????????????????????????????????? ??????????????csv????????????????kafka???????????????????????????????????????? sql?????????????? CREATE TABLE T_UserBehavior( userId BIGINT, itemId BIGINT, categoryId BIGINT, behavior VARCHAR, optime BIGINT ) WITH ( 'connector.type' = 'filesystem', -- required: specify to connector type 'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory 'format.type' = 'csv', 'format.fields.0.name' = 'userId', -- required: define the schema either by using type information 'format.fields.0.type' = 'BIGINT', 'format.fields.1.name' = 'itemId', 'format.fields.1.type' = 'BIGINT', 'format.fields.2.name' = 'categoryId', 'format.fields.2.type' = 'BIGINT', 'format.fields.3.name' = 'behavior', 'format.fields.3.type' = 'VARCHAR', 'format.fields.4.name' = 'optime', 'format.fields.4.type' = 'BIGINT' );