ok. 就是用hadoop mr writer vs  flink 自实现的native 
writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 
改成false是可以满足我们的写hive需求了
还有一个问题,之前问过你,你还没回复:
HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 
如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 
sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
在 2020-09-17 13:43:04,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>可以再尝试下最新的1.11.2吗?
>
>https://flink.apache.org/downloads.html
>
>On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <kandy1...@163.com> wrote:
>
>> 是master分支代码
>> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> if (userMrWriter) {
>>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
>> rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> } else {
>>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
>> createBulkWriterFactory(partitionColumns, sd);
>>    if (bulkFactory.isPresent()) {
>>       builder = StreamingFileSink.forBulkFormat(
>> new org.apache.flink.core.fs.Path(sd.getLocation()),
>>             new
>> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
>>             .withBucketAssigner(assigner)
>>             .withRollingPolicy(rollingPolicy)
>>             .withOutputFileConfig(outputFileConfig);
>> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
>> } else {
>>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner, rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> BulkWriter Factory not available.");
>> }
>> }
>> 在 2020-09-17 13:21:40,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>> >是最新的代码吗?
>> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >
>> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <kandy1...@163.com> wrote:
>> >
>> >> @Jingsong Li
>> >>
>> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >>    CatalogTable table = checkNotNull(context.getTable());
>> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >>
>> >>    boolean isGeneric =
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >>
>> >>    if (!isGeneric) {
>> >> return new HiveTableSink(
>> >>             context.getConfiguration().get(
>> >>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> context.isBounded(),
>> >>             new JobConf(hiveConf),
>> >> context.getObjectIdentifier(),
>> >> table);
>> >> } else {
>> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> }
>> >> }
>> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >>
>> >> If it is false, using flink native writer to write parquet and orc
>> files;
>> >>
>> >> If it is true, using hadoop mapred record writer to write parquet and
>> orc
>> >> files
>> >>
>> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >>
>> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> 一些相关的参数 ?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-17 11:21:43,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>> >> >Sink并行度
>> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >
>> >> >HDFS性能
>> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >
>> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <kandy1...@163.com> wrote:
>> >> >
>> >> >> 场景很简单,就是kafka2hive
>> >> >> --5min入仓Hive
>> >> >>
>> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >>
>> >> >> SELECT
>> >> >>
>> >> >>  arg_service,
>> >> >>
>> >> >> time_local
>> >> >>
>> >> >> .....
>> >> >>
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >> >>
>> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
>> properties.group.id
>> >> '='kafka_hive_test',
>> >> >> 'scan.startup.mode'='earliest-offset') */;
>> >> >>
>> >> >>
>> >> >>
>> >> >> --kafka source表定义
>> >> >>
>> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >> >>
>> >> >> arg_service string COMMENT 'arg_service',
>> >> >>
>> >> >> ....
>> >> >>
>> >> >> )WITH (
>> >> >>
>> >> >>   'connector' = 'kafka',
>> >> >>
>> >> >>   'topic' = '...',
>> >> >>
>> >> >>   'properties.bootstrap.servers' = '...',
>> >> >>
>> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >> >>
>> >> >>   'scan.startup.mode' = 'group-offsets',
>> >> >>
>> >> >>   'format' = 'json',
>> >> >>
>> >> >>   'json.fail-on-missing-field' = 'false',
>> >> >>
>> >> >>   'json.ignore-parse-errors' = 'true'
>> >> >>
>> >> >> );
>> >> >> --sink hive表定义
>> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> >> ....
>> >> >> )
>> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >> >>   'sink.partition-commit.trigger'='process-time',
>> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >> >>
>>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >> >>   'sink.rolling-policy.check-interval' ='30s',
>> >> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >> >>   'sink.rolling-policy.file-size'='128MB'
>> >> >> );
>> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> >> 就是flink sql可以
>> >> >>
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >> >
>> >> >> >另外,压测时是否可以看下jstack?
>> >> >> >
>> >> >> >Best,
>> >> >> >Jingsong
>> >> >> >
>> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <kandy1...@163.com>
>> wrote:
>> >> >> >
>> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> >> ,source
>> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee

回复