????????:??????????????8??part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 
...??????
????,??????????????csv????,????????????????????



Flink SQL:
String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
        "  id STRING,\n" +
        "  tag_code STRING,\n"+
        "  parent_id STRING,\n"+
        "  name STRING,\n"+
        "  type STRING,\n"+
        "  tag_type STRING,\n"+
        "  data_type STRING,\n"+
        "  status STRING,\n"+
        "  valid_status STRING,\n"+
        "  opr_status STRING,\n"+
        "  online STRING,\n"+
        "  opr_type STRING,\n"+
        "  opr_time STRING,\n"+
        "  invalid_time STRING,\n"+
        "  auth_type STRING,\n"+
        "  remark STRING,\n"+
        "  sort STRING,\n"+
        "  batch_no STRING,\n"+
        "  created_by STRING,\n"+
        "  created_time STRING,\n"+
        "  updated_by STRING,\n"+
        "  updated_time STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- ????: ??????????????\n" +
        "  'path' = 
'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',
  -- ????: ??????????????\n" +
        "  'format' = 'csv'                   -- ????: 
?????????????????????????????????? ?????? ??????????????????\n" +
        ")\n";

String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
        "  id STRING,\n" +
        "  tag_id STRING,\n"+
        "  code STRING,\n"+
        "  name STRING,\n"+
        "  content STRING,\n"+
        "  status STRING,\n"+
        "  created_by STRING,\n"+
        "  created_time STRING,\n"+
        "  updated_by STRING,\n"+
        "  updated_time STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- ????: ??????????????\n" +
        "  'path' = 
'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',
  -- ????: ??????????????\n" +
        "  'format' = 'csv'                   -- ????: 
?????????????????????????????????? ?????? ??????????????????\n" +
        ")\n";

//??????????
String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
        "  id STRING,\n" +
        "  tag_code STRING,\n"+
        "  name STRING,\n"+
        "  data_type STRING,\n"+
        "  detailID STRING,\n"+
        "  tag_id STRING,\n"+
        "  detailName STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- ????: ??????????????\n" +
        "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/',  -- ????: 
??????????????\n" +
        "  'format' = 'csv',                   -- ????: 
?????????????????????????????????? ?????? ??????????????????\n" +
        ")\n";

String joinSQL = "insert into loaclhost_File\n" +
        "SELECT tw_smart_tag.id AS id,\n" +
        "tw_smart_tag.tag_code AS tag_code,\n" +
        "tw_smart_tag.name AS name,\n" +
        "tw_smart_tag.data_type AS data_type,\n" +
        "tw_smart_tag_detail.id AS detailID,\n" +
        "tw_smart_tag_detail.tag_id AS tag_id,\n" +
        "tw_smart_tag_detail.name AS detailName\n" +
                "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON 
tw_smart_tag.id = tw_smart_tag_detail.tag_id";
// tw_smart_tag.id = tw_smart_tag_detail.tag_id
tenv.executeSql(tw_smart_tag).print();
tenv.executeSql(tw_smart_tag_detail).print();
tenv.executeSql(loaclhostFile).print();
tenv.executeSql(joinSQL).print();

??????


 

回复