这是bug,已经修复了,待发布

On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <18579099...@163.com> wrote:

> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
> 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
> parquet表依然没有任何问题,而orc表任务无限重启。并报错。
>
> java.io.FileNotFoundException: File does not exist:
> hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at StreamExecCalc$4.processElement(Unknown Source) ~[?:?]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。
> 在使用sql
> client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该
> 如果能解决这个问题呢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 15:40:54,"Rui Li" <lirui.fu...@gmail.com> 写道:
> >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
> >
> >val tableResult = tEnv.executeSql(insert)
> >// wait to finish
> >tableResult.getJobClient.get
> >  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
> >  .get
> >
> >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
> >
>
> >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下
> >
> >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?
> >
> >可以在flink-conf.yaml里设置execution.checkpointing.interval
> >
> >
> >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote:
> >
> >> 添加不了附件,我就直接贴代码了
> >>
> >> import java.time.Duration
> >>
> >>
> >> import org.apache.flink.streaming.api.{CheckpointingMode,
> >> TimeCharacteristic}
> >> import
> >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
> >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
> >> TableResult}
> >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> >> import org.apache.flink.table.catalog.hive.HiveCatalog
> >>
> >>
> >>
> >>
> >> /**
> >>   * author dinghh
> >>   * time 2020-08-11 17:03
> >>   */
> >> object WriteHiveStreaming {
> >>     def main(args: Array[String]): Unit = {
> >>
> >>
> >>         val streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
>  streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>         streamEnv.setParallelism(3)
> >>
> >>
> >>         val tableEnvSettings = EnvironmentSettings.newInstance()
> >>                 .useBlinkPlanner()
> >>                 .inStreamingMode()
> >>                 .build()
> >>         val tableEnv = StreamTableEnvironment.create(streamEnv,
> >> tableEnvSettings)
> >>
> >>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> >> CheckpointingMode.EXACTLY_ONCE)
> >>
> >>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> >> Duration.ofSeconds(20))
> >>
> >>
> >>
> >>
> >>
> >>
> >>         val catalogName = "my_catalog"
> >>         val catalog = new HiveCatalog(
> >>             catalogName,              // catalog name
> >>             "default",                // default database
> >>
> >>
> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
> >> // Hive config (hive-site.xml) directory
> >>             "1.1.0"                   // Hive version
> >>         )
> >>         tableEnv.registerCatalog(catalogName, catalog)
> >>         tableEnv.useCatalog(catalogName)
> >>
> >>
> >>
> >>
> >>         //删除流表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
> >>             """.stripMargin)
> >>
> >>
> >>         //创建流表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `stream_db`.`datagen_user` (
> >>               | id INT,
> >>               | name STRING,
> >>               | dt AS localtimestamp,
> >>               | WATERMARK FOR dt AS dt
> >>               |) WITH (
> >>               | 'connector' = 'datagen',
> >>               | 'rows-per-second'='10',
> >>               | 'fields.id.kind'='random',
> >>               | 'fields.id.min'='1',
> >>               | 'fields.id.max'='1000',
> >>               | 'fields.name.length'='5'
> >>               |)
> >>             """.stripMargin)
> >>
> >>
> >>         //切换hive方言
> >>         tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>
> >>
> >>         //删除hive orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `default`.`hive_user_orc`
> >>               |
> >>             """.stripMargin)
> >>
> >>
> >>         //创建hive orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `default`.`hive_user_orc` (
> >>               |  id INT,
> >>               |  name STRING
> >>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> >> STRING ) STORED AS ORC TBLPROPERTIES (
> >>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> >> $ts_hour:$ts_minute:00.000',
> >>               |  'sink.partition-commit.trigger'='partition-time',
> >>               |  'sink.partition-commit.delay'='1 min',
> >>               |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>               |)
> >>             """.stripMargin)
> >>
> >>
> >>         //删除hive parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
> >>             """.stripMargin)
> >>         //创建hive parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `default`.`hive_user_parquet` (
> >>               |  id INT,
> >>               |  name STRING
> >>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> >> STRING) STORED AS PARQUET TBLPROPERTIES (
> >>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> >> $ts_hour:$ts_minute:00.000',
> >>               |  'sink.partition-commit.trigger'='partition-time',
> >>               |  'sink.partition-commit.delay'='1 min',
> >>               |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>               |)
> >>             """.stripMargin)
> >>         //设置flink方言
> >>         tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >>         //流式写入orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |INSERT INTO `default`.`hive_user_orc`
> >>               |SELECT
> >>               |    id,name,
> >>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
> >>               |    DATE_FORMAT(dt,'HH'),
> >>               |    DATE_FORMAT(dt,'mm')
> >>               |FROM
> >>               |    stream_db.datagen_user
> >>             """.stripMargin)
> >>         //流式写入parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |INSERT INTO `default`.`hive_user_parquet`
> >>               |SELECT
> >>               |    id,name,
> >>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
> >>               |    DATE_FORMAT(dt,'HH'),
> >>               |    DATE_FORMAT(dt,'mm')
> >>               |FROM
> >>               |    stream_db.datagen_user
> >>             """.stripMargin)
> >>
> >>
> >>     }
> >>
> >>
> >> }
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-08-13 10:08:55,"flink小猪" <18579099...@163.com> 写道:
> >>
> >>
> >>
> >>
> >> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。
> >>
> >>
> >>
> >>
> >>
> >>
> 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
> >> 2.没有设置table.exec.hive.fallback-mapred-writer。
> >> 以下是我的几个疑问
> >> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
> >> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
> >>
> >> 这是orc生成的文件
> >>
> >> 这是parquet生成的文件
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-08-12 17:33:30,"Rui Li" <lirui.fu...@gmail.com> 写道:
> >> >Hi,
> >> >
> >> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
> >> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
> >> >
> >> >On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <
> 18579099...@163.com>
> >> >wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >>
> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
> >> >>
> >> >>
> >>
> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
> >> >>
> >>
> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
> >> >> in thread "main" java.lang.NoClassDefFoundError:
> >> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
> >> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
> >> >> ------------------------------
> >> >> 18579099...@163.com
> >> >>
> >> >
> >> >
> >> >--
> >> >Best regards!
> >> >Rui Li
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> >
> >--
> >Best regards!
> >Rui Li
>


-- 
Best, Jingsong Lee

回复