我用的是flinksql , 不过以我的理解, 一个节点不是应该也有输入输出吗, 想问下这有对应的文档吗
详细代码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
        tableEnvironment.executeSql("CREATE FUNCTION ColumnWithType AS
'com.cxydevelop.flinkdemo_1_11.tablesql.function.udtf.ColumnWithType'");
        tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
                "    request_uri STRING,\n" +
                "    `time` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'sls',\n" +
                ...
                "    'consumer.beginposition' = 'end_cursor',\n" +
                "    'format' = 'sls'\n" +
                ")");
        tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
                "    media_type_id STRING" +
                "    , platform STRING\n" +
                "    ,game_id STRING\n" +
                "    ,agent_id STRING\n" +
                "    ,site_id STRING\n" +
                "    ,idfa STRING\n" +
                "    ,os STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");
        tableEnvironment.executeSql(
                "insert into sinktable " +
                    "select media_type_id, platform, game_id, agent_id,
site_id, idfa, os " +
                    "from sourceTable,LATERAL
TABLE(ColumnWithType(request_uri,'game_id,agent_id,site_id,idfa,os')) " +
                    "where POSITION('log.gif' in request_uri) > 0");



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复