大家好:
 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE  source_kafka (\n" +
                  "    appName  STRING,\n" +
                  "    appVersion STRING,\n" +
                  "    uploadTime STRING\n" +
                  ") WITH (\n" +
                  "  'connector.type' = 'kafka',       \n" +
                  "  'connector.version' = '0.10',\n" +
                  "  'connector.topic' = 'mytest',\n" +
                  "  'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                  "  'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                  "  'connector.properties.group.id' = 'testGroup',\n" +
                  "  'format.type'='json',\n" +
                  "  'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" +
            "    appName  STRING,\n" +
            "    appVersion STRING,\n" +
            "    uploadTime STRING,\n" +
            "  dt STRING," +
            "  h string" +
            ")  PARTITIONED BY (dt,h)  WITH (\n" +
            "  'connector'='filesystem',\n" +
                     "  'path'='hdfs://localhost/tmp/',\n" +
                     " 'sink.partition-commit.policy.kind' =
'success-file', " +
                     "  'format'='orc'\n" +
                     ")";
tEnv.executeSql(sql);

String insertSql = "insert into  fs_table SELECT appName
,appVersion,uploadTime, " +
                  " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}

回复