我理解: 这个Operator的并行度一样,chain成一个Operator,所以它的头尾就是source和sink,是计算的作业内部的,source输入的不会被统计到,只会统计到source输出到下游Operator的,同理,sink输入的可以统计到,输出的不能被统计到
[email protected] wrote > 我用的是flinksql , 不过以我的理解, 一个节点不是应该也有输入输出吗, 想问下这有对应的文档吗 > 详细代码 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, settings); > tableEnvironment.executeSql("CREATE FUNCTION ColumnWithType AS > 'com.cxydevelop.flinkdemo_1_11.tablesql.function.udtf.ColumnWithType'"); > tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + > " request_uri STRING,\n" + > " `time` STRING\n" + > ") WITH (\n" + > " 'connector' = 'sls',\n" + > ... > " 'consumer.beginposition' = 'end_cursor',\n" + > " 'format' = 'sls'\n" + > ")"); > tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + > " media_type_id STRING" + > " , platform STRING\n" + > " ,game_id STRING\n" + > " ,agent_id STRING\n" + > " ,site_id STRING\n" + > " ,idfa STRING\n" + > " ,os STRING\n" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"); > tableEnvironment.executeSql( > "insert into sinktable " + > "select media_type_id, platform, game_id, agent_id, > site_id, idfa, os " + > "from sourceTable,LATERAL > TABLE(ColumnWithType(request_uri,'game_id,agent_id,site_id,idfa,os')) " + > "where POSITION('log.gif' in request_uri) > 0"); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from: http://apache-flink.147419.n8.nabble.com/
