这是bug,已经修复了,待发布 On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote:
> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间 > 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务, > parquet表依然没有任何问题,而orc表任务无限重启。并报错。 > > java.io.FileNotFoundException: File does not exist: > hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298 > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at StreamExecCalc$21.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at StreamExecCalc$4.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。 > 在使用sql > client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该 > 如果能解决这个问题呢 > > > > > > > > > > > > > > > 在 2020-08-13 15:40:54,"Rui Li" <lirui.fu...@gmail.com> 写道: > >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束: > > > >val tableResult = tEnv.executeSql(insert) > >// wait to finish > >tableResult.getJobClient.get > > .getJobExecutionResult(Thread.currentThread.getContextClassLoader) > > .get > > > >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > > > > >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > > > >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置? > > > >可以在flink-conf.yaml里设置execution.checkpointing.interval > > > > > >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote: > > > >> 添加不了附件,我就直接贴代码了 > >> > >> import java.time.Duration > >> > >> > >> import org.apache.flink.streaming.api.{CheckpointingMode, > >> TimeCharacteristic} > >> import > >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions > >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, > >> TableResult} > >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > >> import org.apache.flink.table.catalog.hive.HiveCatalog > >> > >> > >> > >> > >> /** > >> * author dinghh > >> * time 2020-08-11 17:03 > >> */ > >> object WriteHiveStreaming { > >> def main(args: Array[String]): Unit = { > >> > >> > >> val streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> streamEnv.setParallelism(3) > >> > >> > >> val tableEnvSettings = EnvironmentSettings.newInstance() > >> .useBlinkPlanner() > >> .inStreamingMode() > >> .build() > >> val tableEnv = StreamTableEnvironment.create(streamEnv, > >> tableEnvSettings) > >> > >> > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >> CheckpointingMode.EXACTLY_ONCE) > >> > >> > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >> Duration.ofSeconds(20)) > >> > >> > >> > >> > >> > >> > >> val catalogName = "my_catalog" > >> val catalog = new HiveCatalog( > >> catalogName, // catalog name > >> "default", // default database > >> > >> > "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", > >> // Hive config (hive-site.xml) directory > >> "1.1.0" // Hive version > >> ) > >> tableEnv.registerCatalog(catalogName, catalog) > >> tableEnv.useCatalog(catalogName) > >> > >> > >> > >> > >> //删除流表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `stream_db`.`datagen_user` > >> """.stripMargin) > >> > >> > >> //创建流表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `stream_db`.`datagen_user` ( > >> | id INT, > >> | name STRING, > >> | dt AS localtimestamp, > >> | WATERMARK FOR dt AS dt > >> |) WITH ( > >> | 'connector' = 'datagen', > >> | 'rows-per-second'='10', > >> | 'fields.id.kind'='random', > >> | 'fields.id.min'='1', > >> | 'fields.id.max'='1000', > >> | 'fields.name.length'='5' > >> |) > >> """.stripMargin) > >> > >> > >> //切换hive方言 > >> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> > >> > >> //删除hive orc表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `default`.`hive_user_orc` > >> | > >> """.stripMargin) > >> > >> > >> //创建hive orc表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `default`.`hive_user_orc` ( > >> | id INT, > >> | name STRING > >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > >> STRING ) STORED AS ORC TBLPROPERTIES ( > >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt > >> $ts_hour:$ts_minute:00.000', > >> | 'sink.partition-commit.trigger'='partition-time', > >> | 'sink.partition-commit.delay'='1 min', > >> | > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> """.stripMargin) > >> > >> > >> //删除hive parquet表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `default`.`hive_user_parquet` > >> """.stripMargin) > >> //创建hive parquet表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `default`.`hive_user_parquet` ( > >> | id INT, > >> | name STRING > >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > >> STRING) STORED AS PARQUET TBLPROPERTIES ( > >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt > >> $ts_hour:$ts_minute:00.000', > >> | 'sink.partition-commit.trigger'='partition-time', > >> | 'sink.partition-commit.delay'='1 min', > >> | > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> """.stripMargin) > >> //设置flink方言 > >> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> //流式写入orc表 > >> tableEnv.executeSql( > >> """ > >> |INSERT INTO `default`.`hive_user_orc` > >> |SELECT > >> | id,name, > >> | DATE_FORMAT(dt,'yyyy-MM-dd'), > >> | DATE_FORMAT(dt,'HH'), > >> | DATE_FORMAT(dt,'mm') > >> |FROM > >> | stream_db.datagen_user > >> """.stripMargin) > >> //流式写入parquet表 > >> tableEnv.executeSql( > >> """ > >> |INSERT INTO `default`.`hive_user_parquet` > >> |SELECT > >> | id,name, > >> | DATE_FORMAT(dt,'yyyy-MM-dd'), > >> | DATE_FORMAT(dt,'HH'), > >> | DATE_FORMAT(dt,'mm') > >> |FROM > >> | stream_db.datagen_user > >> """.stripMargin) > >> > >> > >> } > >> > >> > >> } > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-08-13 10:08:55,"flink小猪" <18579099...@163.com> 写道: > >> > >> > >> > >> > >> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 > >> > >> > >> > >> > >> > >> > 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 > >> 2.没有设置table.exec.hive.fallback-mapred-writer。 > >> 以下是我的几个疑问 > >> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > >> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 > >> > >> 这是orc生成的文件 > >> > >> 这是parquet生成的文件 > >> > >> > >> > >> > >> > >> 在 2020-08-12 17:33:30,"Rui Li" <lirui.fu...@gmail.com> 写道: > >> >Hi, > >> > > >> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? > >> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >> > > >> >On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com < > 18579099...@163.com> > >> >wrote: > >> > > >> >> > >> >> > >> >> > >> > 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, > >> >> > >> >> > >> > 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, > >> >> > >> > 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception > >> >> in thread "main" java.lang.NoClassDefFoundError: > >> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval > >> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 > >> >> ------------------------------ > >> >> 18579099...@163.com > >> >> > >> > > >> > > >> >-- > >> >Best regards! > >> >Rui Li > >> > >> > >> > >> > >> > >> > >> > > > > > > > >-- > >Best regards! > >Rui Li > -- Best, Jingsong Lee