Hi, Jun,

非常感谢详细充分的测试~

接下来我复现排查下~

Best,
Jingsong

On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang <[email protected]>
wrote:

> hi,jinsong:
>
> 在我的测试环境下,对于我贴出来的那个代码。
>
> 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
> 2.如果设置程序的并行度是大于1,那么也会有success生成。
> 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。
>
>
> 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。
>
> Jingsong Li <[email protected]> 于2020年7月10日周五 下午2:54写道:
>
> > Hi,
> >
> > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> > 我用同样SQL在我的环境是有的。
> >
> > Best,
> > Jingsong
> >
> > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <[email protected]>
> > wrote:
> >
> > > 大家好:
> > >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> > >
> > >
> > >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> > >
> > > public static void main(String[] args) throws Exception{
> > > StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > bsEnv.enableCheckpointing(10000);
> > > bsEnv.setParallelism(1);
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> > >
> > > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> > >                   "    appName  STRING,\n" +
> > >                   "    appVersion STRING,\n" +
> > >                   "    uploadTime STRING\n" +
> > >                   ") WITH (\n" +
> > >                   "  'connector.type' = 'kafka',       \n" +
> > >                   "  'connector.version' = '0.10',\n" +
> > >                   "  'connector.topic' = 'mytest',\n" +
> > >                   "  'connector.properties.zookeeper.connect' =
> > > 'localhost:2181',\n" +
> > >                   "  'connector.properties.bootstrap.servers' =
> > > 'localhost:9092',\n" +
> > >                   "  'connector.properties.group.id' =
> 'testGroup',\n" +
> > >                   "  'format.type'='json',\n" +
> > >                   "  'update-mode' = 'append' )";
> > >
> > > tEnv.executeSql(sqlSource);
> > >
> > > String sql = "CREATE TABLE fs_table (\n" +
> > >             "    appName  STRING,\n" +
> > >             "    appVersion STRING,\n" +
> > >             "    uploadTime STRING,\n" +
> > >             "  dt STRING," +
> > >             "  h string" +
> > >             ")  PARTITIONED BY (dt,h)  WITH (\n" +
> > >             "  'connector'='filesystem',\n" +
> > >                      "  'path'='hdfs://localhost/tmp/',\n" +
> > >                      " 'sink.partition-commit.policy.kind' =
> > > 'success-file', " +
> > >                      "  'format'='orc'\n" +
> > >                      ")";
> > > tEnv.executeSql(sql);
> > >
> > > String insertSql = "insert into  fs_table SELECT appName
> > > ,appVersion,uploadTime, " +
> > >                   " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> > >
> > > tEnv.executeSql(insertSql);
> > >
> > > }
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

回复