??????tableEnv.sqlUpdate(ddl);??????????

????????????????????????????????????rowtime????????????????????????rowtime??????????????????????
??????flink???????????????????????????????????????


??????????????csv????????????????kafka????????????????????????????????????????


sql??????????????
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',               -- required: specify to 
connector type
  'connector.path' = 
'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',
  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId',         -- required: define the schema 
either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);

回复