????????:??????????????8??part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0
...??????
????,??????????????csv????,????????????????????
Flink SQL:
String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
" id STRING,\n" +
" tag_code STRING,\n"+
" parent_id STRING,\n"+
" name STRING,\n"+
" type STRING,\n"+
" tag_type STRING,\n"+
" data_type STRING,\n"+
" status STRING,\n"+
" valid_status STRING,\n"+
" opr_status STRING,\n"+
" online STRING,\n"+
" opr_type STRING,\n"+
" opr_time STRING,\n"+
" invalid_time STRING,\n"+
" auth_type STRING,\n"+
" remark STRING,\n"+
" sort STRING,\n"+
" batch_no STRING,\n"+
" created_by STRING,\n"+
" created_time STRING,\n"+
" updated_by STRING,\n"+
" updated_time STRING\n"+
") WITH (\n" +
" 'connector' = 'filesystem', -- ????: ??????????????\n" +
" 'path' =
'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',
-- ????: ??????????????\n" +
" 'format' = 'csv' -- ????:
?????????????????????????????????? ?????? ??????????????????\n" +
")\n";
String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
" id STRING,\n" +
" tag_id STRING,\n"+
" code STRING,\n"+
" name STRING,\n"+
" content STRING,\n"+
" status STRING,\n"+
" created_by STRING,\n"+
" created_time STRING,\n"+
" updated_by STRING,\n"+
" updated_time STRING\n"+
") WITH (\n" +
" 'connector' = 'filesystem', -- ????: ??????????????\n" +
" 'path' =
'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',
-- ????: ??????????????\n" +
" 'format' = 'csv' -- ????:
?????????????????????????????????? ?????? ??????????????????\n" +
")\n";
//??????????
String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
" id STRING,\n" +
" tag_code STRING,\n"+
" name STRING,\n"+
" data_type STRING,\n"+
" detailID STRING,\n"+
" tag_id STRING,\n"+
" detailName STRING\n"+
") WITH (\n" +
" 'connector' = 'filesystem', -- ????: ??????????????\n" +
" 'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/', -- ????:
??????????????\n" +
" 'format' = 'csv', -- ????:
?????????????????????????????????? ?????? ??????????????????\n" +
")\n";
String joinSQL = "insert into loaclhost_File\n" +
"SELECT tw_smart_tag.id AS id,\n" +
"tw_smart_tag.tag_code AS tag_code,\n" +
"tw_smart_tag.name AS name,\n" +
"tw_smart_tag.data_type AS data_type,\n" +
"tw_smart_tag_detail.id AS detailID,\n" +
"tw_smart_tag_detail.tag_id AS tag_id,\n" +
"tw_smart_tag_detail.name AS detailName\n" +
"FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON
tw_smart_tag.id = tw_smart_tag_detail.tag_id";
// tw_smart_tag.id = tw_smart_tag_detail.tag_id
tenv.executeSql(tw_smart_tag).print();
tenv.executeSql(tw_smart_tag_detail).print();
tenv.executeSql(loaclhostFile).print();
tenv.executeSql(joinSQL).print();
??????