我用的是flinksql , 不过以我的理解, 一个节点不是应该也有输入输出吗, 想问下这有对应的文档吗
详细代码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
tableEnvironment.executeSql("CREATE FUNCTION ColumnWithType AS
'com.cxydevelop.flinkdemo_1_11.tablesql.function.udtf.ColumnWithType'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" request_uri STRING,\n" +
" `time` STRING\n" +
") WITH (\n" +
" 'connector' = 'sls',\n" +
...
" 'consumer.beginposition' = 'end_cursor',\n" +
" 'format' = 'sls'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
" media_type_id STRING" +
" , platform STRING\n" +
" ,game_id STRING\n" +
" ,agent_id STRING\n" +
" ,site_id STRING\n" +
" ,idfa STRING\n" +
" ,os STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select media_type_id, platform, game_id, agent_id,
site_id, idfa, os " +
"from sourceTable,LATERAL
TABLE(ColumnWithType(request_uri,'game_id,agent_id,site_id,idfa,os')) " +
"where POSITION('log.gif' in request_uri) > 0");
--
Sent from: http://apache-flink.147419.n8.nabble.com/