可以再尝试下最新的1.11.2吗? https://flink.apache.org/downloads.html
On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <kandy1...@163.com> wrote: > 是master分支代码 > 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 > 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 > if (userMrWriter) { > builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, > rollingPolicy, outputFileConfig); > LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); > } else { > Optional<BulkWriter.Factory<RowData>> bulkFactory = > createBulkWriterFactory(partitionColumns, sd); > if (bulkFactory.isPresent()) { > builder = StreamingFileSink.forBulkFormat( > new org.apache.flink.core.fs.Path(sd.getLocation()), > new > FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) > .withBucketAssigner(assigner) > .withRollingPolicy(rollingPolicy) > .withOutputFileConfig(outputFileConfig); > LOG.info("Hive streaming sink: Use native parquet&orc writer."); > } else { > builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > assigner, rollingPolicy, outputFileConfig); > LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because > BulkWriter Factory not available."); > } > } > 在 2020-09-17 13:21:40,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >是最新的代码吗? > >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > >它是影响性能的,1.11.2已经投票通过,即将发布 > > > >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <kandy1...@163.com> wrote: > > > >> @Jingsong Li > >> > >> public TableSink createTableSink(TableSinkFactory.Context context) { > >> CatalogTable table = checkNotNull(context.getTable()); > >> Preconditions.checkArgument(table instanceof CatalogTableImpl); > >> > >> boolean isGeneric = > >> > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > >> > >> if (!isGeneric) { > >> return new HiveTableSink( > >> context.getConfiguration().get( > >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > >> context.isBounded(), > >> new JobConf(hiveConf), > >> context.getObjectIdentifier(), > >> table); > >> } else { > >> return TableFactoryUtil.findAndCreateTableSink(context); > >> } > >> } > >> > >> > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > >> > >> If it is false, using flink native writer to write parquet and orc > files; > >> > >> If it is true, using hadoop mapred record writer to write parquet and > orc > >> files > >> > >> 将此参数调整成false后,同样的资源配置下,tps达到30W > >> > >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush > >> 一些相关的参数 ? > >> > >> > >> > >> > >> > >> 在 2020-09-17 11:21:43,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >> >Sink并行度 > >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >> > > >> >HDFS性能 > >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >> > > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <kandy1...@163.com> wrote: > >> > > >> >> 场景很简单,就是kafka2hive > >> >> --5min入仓Hive > >> >> > >> >> INSERT INTO hive.temp_.hive_5min > >> >> > >> >> SELECT > >> >> > >> >> arg_service, > >> >> > >> >> time_local > >> >> > >> >> ..... > >> >> > >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> >> > >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' > properties.group.id > >> '='kafka_hive_test', > >> >> 'scan.startup.mode'='earliest-offset') */; > >> >> > >> >> > >> >> > >> >> --kafka source表定义 > >> >> > >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> >> > >> >> arg_service string COMMENT 'arg_service', > >> >> > >> >> .... > >> >> > >> >> )WITH ( > >> >> > >> >> 'connector' = 'kafka', > >> >> > >> >> 'topic' = '...', > >> >> > >> >> 'properties.bootstrap.servers' = '...', > >> >> > >> >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> >> > >> >> 'scan.startup.mode' = 'group-offsets', > >> >> > >> >> 'format' = 'json', > >> >> > >> >> 'json.fail-on-missing-field' = 'false', > >> >> > >> >> 'json.ignore-parse-errors' = 'true' > >> >> > >> >> ); > >> >> --sink hive表定义 > >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > >> >> .... > >> >> ) > >> >> PARTITIONED BY (dt string , hm string) stored as orc location > >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > >> >> 'sink.partition-commit.trigger'='process-time', > >> >> 'sink.partition-commit.delay'='0 min', > >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > >> >> > 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > >> >> 'sink.rolling-policy.check-interval' ='30s', > >> >> 'sink.rolling-policy.rollover-interval'='10min', > >> >> 'sink.rolling-policy.file-size'='128MB' > >> >> ); > >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > >> >> 就是flink sql可以 > >> >> > >> > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > >> >> 这块,有没有什么可以提升性能相关的优化参数? > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >> >> >Hi, > >> >> > > >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >> >> > > >> >> >另外,压测时是否可以看下jstack? > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <kandy1...@163.com> > wrote: > >> >> > > >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 > >> >> ,source > >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > >> >> > > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee