Re: 多线程模式下使用Blink TableEnvironment

2020-09-18 文章 Jeff Zhang
Hi jun su,

如果是自建平台的话,可以考虑用zeppelin的sdk 来提交作业
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh





jun su  于2020年9月18日周五 上午10:59写道:

> hi godfrey,
>
> 我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
> 再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题
>
> godfrey he  于2020年9月17日周四 下午10:07写道:
>
> > TableEnvironment 不是多线程安全的。
> >
> > btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
> >
> > Jeff Zhang  于2020年9月14日周一 下午12:10写道:
> >
> > > 参考zeppelin的做法,每个线程里都调用这个
> > >
> > >
> > >
> >
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
> > >
> > >
> > > jun su  于2020年9月14日周一 上午11:54写道:
> > >
> > > > hi all,
> > > >
> > > > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > >   at java.util.Objects.requireNonNull(Objects.java:203)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > > > 解决
> > > >
> > > >
> > > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at scala.Predef$.Double2double(Predef.scala:365)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > > > at
> > > >
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > > > Source)
> > > > at
> > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > > > Source)
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 文章 Weijie Guo 2
Good job! Very thanks @ZhuZhu for driving this and thanks for all contributed
to the release!

best,
Weijie
Zhu Zhu-2 wrote
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
> 
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming
> applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
> 
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> 
> Thanks,
> Zhu





--
Sent from: http://apache-flink.147419.n8.nabble.com/


答复: flink RichFilterFunction重复过滤一条数据

2020-09-18 文章 范超
Hi, 明启,我也遇到了类似的问题,会不会是因为并行度的问题导致?


-邮件原件-
发件人: 明启 孙 [mailto:374060...@qq.com] 
发送时间: 2020年9月15日 星期二 10:45
收件人: user-zh 
主题: flink RichFilterFunction重复过滤一条数据

场景:

flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。

在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。

然后我在warn之后写了一条print语句,这时候就能正常过滤,过滤一条就打印一次warn,不会出现过滤一条数据,重复打印warn。因为这会导致我后续正常的数据无法消费,不知道这是什么问题。
代码:
@Override
   Public boolean filter(Genericrecord record) throws Exception{
 String op_type=record.get("op_type")!=null ? 
record.get("op_type")!=null.toString():"-";
 if("D".equals(op_type)){
  logger.warn(record.toString())
//System.out.println("过滤掉");
 return false;
}return true;
 }
smq



Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 JasonLee
HI

我理解你的 kafka 生产数据的速度比较慢 你并发设置的再大都是没有用的 正常 source 的并行度设置和 kafka 的 partition
个数相等就可以了 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: K8s native 部署失败

2020-09-18 文章 yanzhibo


跟jobmanager在同一个node上的tm是可以注册到jm上的,其他的node是不行的

Jm是单机模式部署

> 2020年9月17日 下午3:55,yanzhibo  写道:
> 
> 是非ha,所有tm都注册不上来,但是在tm的pod中  根据service 是可以ping 通 jobmanager的
> 
> 
>> 2020年9月17日 上午11:10,Yang Wang  写道:
>> 
>> 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢
>> 
>> 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
>> 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常
>> 
>> 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题
>> 
>> 
>> Best,
>> Yang
>> 
>> yanzhibo  于2020年9月16日周三 下午5:25写道:
>> 
>>> 一个job manager pod 提交job后,申请taskmanager失败
>>> 
>>> 
>>> Taskmanager 的异常
>>> 
>>> Fatal error occurred in TaskExecutor akka.tcp://
>>> flink@179.10.251.70:6122/user/rpc/taskmanager_0.
>>> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>>> Could not register at the ResourceManager within the specified maximum
>>> registration duration 30 ms. This indicates a problem with this
>>> instance. Terminating now.
>>>   at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>> 2020-09-16 09:14:39,077 ERROR
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
>>> error occurred while executing the TaskManager. Shutting it down...
>>> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>>> Could not register at the ResourceManager within the specified maximum
>>> registration duration 30 ms. This indicates a problem with this
>>> instance. Terminating now.
>>>   at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>   at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~
>>> 
>>> 
>>> Jobmanger 异常
>>> 
>>> 0d5f8478a2ab4e17d816810752f669eb) switched from SCHEDULED to FAILED on not
>>> deployed.
>>> 

Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 Benchao Li
提交两个作业的话,两个作业是完全独立的,都会消费全量数据。

一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如:
1. 作业是否有lag,如果没有lag,那其实是没有问题的
2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里
有可能是某个算子在反压导致整个作业的消费能力不足
也有可能是作业的整体CPU资源不足导致的
也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka
partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的
3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压

范超  于2020年9月18日周五 下午4:07写道:

> 各位好,我遇到了一个奇怪的问题
>
> 我是使用flink1.10和 flink-connector-kafka_2.11
>
> 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。
>
> 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。
>
> 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
>
> 求各位大佬指导
>


-- 

Best,
Benchao Li


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 文章 Zhilong Hong
Thank you, @ZhuZhu, for driving this release!


Best regards,

Zhilong


From: Zhu Zhu 
Sent: Thursday, September 17, 2020 13:29
To: dev ; user ; user-zh 
; Apache Announce List 
Subject: [ANNOUNCE] Apache Flink 1.11.2 released

The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re:Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 文章 kandy.wang



hi wangenbao :
  我这边还没出现过OOM的情况,我理解调大TM 的资源内存 CPU这些参数应当是可以的。
  我这边遇到的问题是性能上不去。不过table.exec.hive.fallback-mapred-writer=false 确实有较大改观。
在 2020-09-18 16:45:29,"wangenbao" <156827...@qq.com> 写道:
>我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
>write;
>当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM;
>相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题;
>Jingsong Li wrote
>> 是最新的代码吗?
>> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> 它是影响性能的,1.11.2已经投票通过,即将发布
>> 
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
>
>> kandy1203@
>
>>  wrote:
>> 
>>> @Jingsong Li
>>>
>>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>>CatalogTable table = checkNotNull(context.getTable());
>>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>>
>>>boolean isGeneric =
>>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>>
>>>if (!isGeneric) {
>>> return new HiveTableSink(
>>> context.getConfiguration().get(
>>>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>>> context.isBounded(),
>>> new JobConf(hiveConf),
>>> context.getObjectIdentifier(),
>>> table);
>>> } else {
>>> return TableFactoryUtil.findAndCreateTableSink(context);
>>> }
>>> }
>>>
>>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>>
>>> If it is false, using flink native writer to write parquet and orc files;
>>>
>>> If it is true, using hadoop mapred record writer to write parquet and orc
>>> files
>>>
>>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>>
>>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>>> 一些相关的参数 ?
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-09-17 11:21:43,"Jingsong Li" 
>
>> jingsonglee0@
>
>>  写道:
>>> >Sink并行度
>>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>>> >
>>> >HDFS性能
>>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>>> >
>>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
>
>> kandy1203@
>
>>  wrote:
>>> >
>>> >> 场景很简单,就是kafka2hive
>>> >> --5min入仓Hive
>>> >>
>>> >> INSERT INTO  hive.temp_.hive_5min
>>> >>
>>> >> SELECT
>>> >>
>>> >>  arg_service,
>>> >>
>>> >> time_local
>>> >>
>>> >> .
>>> >>
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>> >>
>>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>>> '='kafka_hive_test',
>>> >> 'scan.startup.mode'='earliest-offset') */;
>>> >>
>>> >>
>>> >>
>>> >> --kafka source表定义
>>> >>
>>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>> >>
>>> >> arg_service string COMMENT 'arg_service',
>>> >>
>>> >> 
>>> >>
>>> >> )WITH (
>>> >>
>>> >>   'connector' = 'kafka',
>>> >>
>>> >>   'topic' = '...',
>>> >>
>>> >>   'properties.bootstrap.servers' = '...',
>>> >>
>>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>>> >>
>>> >>   'scan.startup.mode' = 'group-offsets',
>>> >>
>>> >>   'format' = 'json',
>>> >>
>>> >>   'json.fail-on-missing-field' = 'false',
>>> >>
>>> >>   'json.ignore-parse-errors' = 'true'
>>> >>
>>> >> );
>>> >> --sink hive表定义
>>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>>> >> 
>>> >> )
>>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>>> >>   'sink.partition-commit.trigger'='process-time',
>>> >>   'sink.partition-commit.delay'='0 min',
>>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>> >>   'sink.rolling-policy.check-interval' ='30s',
>>> >>   'sink.rolling-policy.rollover-interval'='10min',
>>> >>   'sink.rolling-policy.file-size'='128MB'
>>> >> );
>>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>>> >> 就是flink sql可以
>>> >>
>>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>>> >> 这块,有没有什么可以提升性能相关的优化参数?
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2020-09-16 19:29:50,"Jingsong Li" 
>
>> jingsonglee0@
>
>>  写道:
>>> >> >Hi,
>>> >> >
>>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>>> >> >
>>> >> >另外,压测时是否可以看下jstack?
>>> >> >
>>> >> >Best,
>>> >> >Jingsong
>>> >> >
>>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang 
>
>> kandy1203@
>
>>  wrote:
>>> >> >
>>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>>> >> ,source
>>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>>> >> >
>>> >> >
>>> >> >
>>> >> >--
>>> >> >Best, Jingsong Lee
>>> >>
>>> >
>>> >
>>> >--
>>> >Best, Jingsong Lee
>>>
>> 
>> 
>> -- 
>> Best, Jingsong Lee
>
>
>Jingsong Li wrote
>> 是最新的代码吗?
>> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> 它是影响性能的,1.11.2已经投票通过,即将发布
>> 
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
>
>> kandy1203@
>

Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 文章 Guowei Ma
Thanks Zhuzhu for driving the release!!!

Best,
Guowei


On Fri, Sep 18, 2020 at 5:10 PM Yun Gao  wrote:

> Great! Very thanks @ZhuZhu for driving this and thanks for all contributed
> to the release!
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Jingsong Li 
> *Send Date:*Thu Sep 17 13:31:41 2020
> *Recipients:*user-zh 
> *CC:*dev , user , Apache
> Announce List 
> *Subject:*Re: [ANNOUNCE] Apache Flink 1.11.2 released
>
>> Thanks ZhuZhu for driving the release.
>>
>> Best,
>> Jingsong
>>
>> On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache
>>> Flink 1.11.2, which is the second bugfix release for the Apache Flink
>>> 1.11
>>> series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements
>>> for this bugfix release:
>>> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>>>
>>> We would like to thank all contributors of the Apache Flink community who
>>> made this release possible!
>>>
>>> Thanks,
>>> Zhu
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: 退订

2020-09-18 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

Han Xiao(联通集团联通支付有限公司总部)  于2020年9月18日周五 下午5:35写道:

> 取消订阅
> 如果您错误接收了该邮件,请通过电子邮件立即通知我们。请回复邮件到 
> hqs-s...@chinaunicom.cn,即可以退订此邮件。我们将立即将您的信息从我们的发送目录中删除。
> If you have received this email in error please notify us immediately by
> e-mail. Please reply to hqs-s...@chinaunicom.cn ,you can unsubscribe from
> this mail. We will immediately remove your information from send catalogue
> of our.
>


退订

2020-09-18 文章 联通集团联通支付有限公司总部
取消订阅
如果您错误接收了该邮件,请通过电子邮件立即通知我们。请回复邮件到 
hqs-s...@chinaunicom.cn,即可以退订此邮件。我们将立即将您的信息从我们的发送目录中删除。 If you have received 
this email in error please notify us immediately by e-mail. Please reply to 
hqs-s...@chinaunicom.cn ,you can unsubscribe from this mail. We will 
immediately remove your information from send catalogue of our.


Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 文章 Yun Gao
Great! Very thanks @ZhuZhu for driving this and thanks for all contributed to 
the release!

Best,
 Yun


 --Original Mail --
Sender:Jingsong Li 
Send Date:Thu Sep 17 13:31:41 2020
Recipients:user-zh 
CC:dev , user , Apache Announce 
List 
Subject:Re: [ANNOUNCE] Apache Flink 1.11.2 released

Thanks ZhuZhu for driving the release.

Best,
Jingsong
On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink(r) is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


-- 
Best, Jingsong Lee

Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-18 文章 wangenbao
这个问题的关键应该是你在
http://apache-flink.147419.n8.nabble.com/StreamingFileWriter-td7161.html
 
中回复的:Flink1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
我这边也设置了table.exec.hive.fallback-mapred-writer=false



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink-1.11.1 读写 Hive 问题

2020-09-18 文章 Rui Li
Hi,

打印数组可以用Arrays.toString方法。获取hive表失败的堆栈是完整的么,感觉底下应该还有root cause?

On Fri, Sep 18, 2020 at 3:32 PM nashcen <2415370...@qq.com> wrote:

> Hello
>
> 各位,经过3天的踩坑,我已经能够从IDEA,用Flink访问Hive,但是如何查看Hive里的数据库、表、以及表里的数据,并把它们打印出来,目前还不太清楚。
> 以下是Hive中的表,
> 
>
> Idea中查询出来的库与表信息,不完整
> 
> Idea中查询表中数据,报错
> 
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 文章 wangenbao
我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
write;
当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM;
相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题;
Jingsong Li wrote
> 是最新的代码吗?
> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> 它是影响性能的,1.11.2已经投票通过,即将发布
> 
> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 

> kandy1203@

>  wrote:
> 
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>if (!isGeneric) {
>> return new HiveTableSink(
>> context.getConfiguration().get(
>>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>> new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li" 

> jingsonglee0@

>  写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 

> kandy1203@

>  wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> 
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> 
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval' ='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >> );
>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> 就是flink sql可以
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-16 19:29:50,"Jingsong Li" 

> jingsonglee0@

>  写道:
>> >> >Hi,
>> >> >
>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >
>> >> >另外,压测时是否可以看下jstack?
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang 

> kandy1203@

>  wrote:
>> >> >
>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> ,source
>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
> 
> 
> -- 
> Best, Jingsong Lee


Jingsong Li wrote
> 是最新的代码吗?
> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> 它是影响性能的,1.11.2已经投票通过,即将发布
> 
> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 

> kandy1203@

>  wrote:
> 
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>if (!isGeneric) {
>> return new 

??????ListState ???? TTL ???? list ??????????????????

2020-09-18 文章 ????
??
 " state backend 
 Heap state backend 
?? Java ??RocksDB state backend 
list  map  8 "


https://github.com/apache/flink/blob/master/docs/dev/stream/state/state.zh.md


by wulei


----
??: 
   "user-zh"



??????FlinkKafkaConsumer on Yarn ?????? ??????????????????kafka??????????????????????????????????

2020-09-18 文章 ????
hello??Source??topic??612??6




----
??: 
   "user-zh"



Re: flink hive批量作业报FileNotFoundException

2020-09-18 文章 Rui Li
Hello,

作业的逻辑大概是啥样的,我去试试

On Thu, Sep 17, 2020 at 10:00 PM godfrey he  wrote:

> cc @Rui Li 
>
> 李佳宸  于2020年9月14日周一 下午5:11写道:
>
>> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
>> 版本是1.11.1
>> Caused by: java.io.FileNotFoundException: File
>>
>> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
>> does not exist.
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
>> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>
>> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>>
>

-- 
Best regards!
Rui Li


FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 范超
各位好,我遇到了一个奇怪的问题

我是使用flink1.10和 flink-connector-kafka_2.11

使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。

但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。

我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。

求各位大佬指导


Re:回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-18 文章 anonnius
hi: 感觉你的关注和回复
1> 下面是我的分析过程
1. 第一次是, 先在sql-client.sh 中执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)


此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入, 
并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的


2. 第二次是, 退出sql-client.sh后在执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和 
手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了


3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark, 
有1分钟变为5分钟
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点



















在 2020-09-18 14:42:42,"chengyanan1...@foxmail.com"  
写道:
>先占个楼
>我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
>select  
>tumble_start(rowtime, interval '2' MINUTE) as wStart,
>tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>count(1) as pv,
>count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>最后得到的结果是这样的 :(跟题主不一样)
>
> wStart  wEndpv
> uv
>  2020-09-18T09:14  2020-09-18T09:16 2 
> 2
>  2020-09-18T09:16  2020-09-18T09:18 8 
> 3
>  2020-09-18T09:18  2020-09-18T09:20 8 
> 3
>  2020-09-18T09:20  2020-09-18T09:22 2 
> 2
>
>等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
>wStartwEnd   
>pvuv
>2020-09-18T09:14  2020-09-18T09:16  2  
>   2
>2020-09-18T09:16  2020-09-18T09:18  2  
>   2
>2020-09-18T09:18  2020-09-18T09:20  8  
>   3
>2020-09-18T09:20  2020-09-18T09:22  2  
>   2
>
>
>
> 
>发件人: anonnius
>发送时间: 2020-09-18 11:24
>收件人: user-zh
>主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
>hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
>0> mac本地环境
>1> flink 1.11.1
>2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
>3> 使用的是sql-client.sh 环境
>4> 先在sql-cli中创建了iservVisit表
>create table iservVisit (
>type string comment '时间类型',
>uuid string comment '用户uri',
>clientTime string comment '10位时间戳',
>rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
> '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
>WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
>) with (
>'connector' = 'kafka-0.10',
>'topic' = 'message-json',
>'properties.bootstrap.servers' = 'localhost:9092',
>'properties.group.id' = 'consumer-rt',
>'format' = 'json',
>'json.ignore-parse-errors' = 'true',
>'scan.startup.mode' = 'earliest-offset'
>)
>然后在sql-cli执行sql
>select  
>tumble_start(rowtime, interval '2' MINUTE) as wStart,
>tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>count(1) as pv,
>count(distinct uuid) as uv 
>from iservVisit
>group by tumble(rowtime, interval '2' MINUTE)
>5> 向kafka生产者依次发送下面的json消息
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
>{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
>{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
>{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 

回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-18 文章 chengyanan1...@foxmail.com
Hi,问题已经找到了
你的kafka是3个分区,第一次消费你是边发送数据边消费,这时可以认为watermark就是按照你发送数据的顺序生成的,会按照你发送数据的顺序触发计算,所以得到的结果也是你预想的结果。

第二次消费,你的数据不再生产了,这时kafka中的数据可认为是历史数据,你的scan.startup.mode设置的是earliest-offset,这时候flink消费的也是历史数据,因为是3个分区,所以flink会先消费完1个分区内的数据,然后再依次消费另外2个分区的数据,kafka跨分区不能保证有序,所以这时候watermark是按照flink消费数据的顺序生成的,也就是按照如下顺序生成watermark的(此时部分数据会被当做迟到的数据遗弃掉):

select * from iservVisit
  type  uuidclientTime  
 rowtime
iservVisit a1600391663   
2020-09-18T09:14:23
iservVisit b1600391748  
 2020-09-18T09:15:48
iservVisit b1600391823  
 2020-09-18T09:17:03


---此时触发2020-09-18 09:14 - 2020-09-18 09:16
iservVisit a1600391857  
 2020-09-18T09:17:37
iservVisit c1600391903  
 2020-09-18T09:18:23
iservVisit b1600391938  
 2020-09-18T09:18:58
iservVisit b1600391970  
 2020-09-18T09:19:30

   
---此时触发2020-09-18 09:16 - 2020-09-18 09:18
iservVisit a1600392057  
 2020-09-18T09:20:57
iservVisit c1600391684  
 2020-09-18T09:14:44 
iservVisit c1600391709  
 2020-09-18T09:15:09   
iservVisit b1600391781  
 2020-09-18T09:16:21
iservVisit a1600391815  
 2020-09-18T09:16:55
iservVisit b1600391851  
 2020-09-18T09:17:31
iservVisit a1600391945  
 2020-09-18T09:19:05
iservVisit c1600391936  
 2020-09-18T09:18:56
iservVisit c1600391993  
 2020-09-18T09:19:53
iservVisit a1600391690  
 2020-09-18T09:14:50
iservVisit c1600391782  
 2020-09-18T09:16:22
iservVisit b1600391822  
 2020-09-18T09:17:02
iservVisit a1600391870  
 2020-09-18T09:17:50
iservVisit a1600391889  
 2020-09-18T09:18:09
iservVisit b1600391951  
 2020-09-18T09:19:11
iservVisit c1600392016  
 2020-09-18T09:20:16
iservVisit a1800392057  
 2027-01-20T04:54:17

   ---此时触发2020-09-18 
09:18 - 2020-09-18 09:20 以及 2020-09-18 09:20 - 2020-09-18 09:22 



PS: 你可以把你的topic设置成1个分区,这样就可以保证数据整体有序,每次查询得到的结果正确且一样的了。
如果分析的有误,敬请指正!



 
发件人: anonnius
发送时间: 2020-09-18 11:24
收件人: user-zh
主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", 

Flink-1.11.1 读写 Hive 问题

2020-09-18 文章 nashcen
Hello
各位,经过3天的踩坑,我已经能够从IDEA,用Flink访问Hive,但是如何查看Hive里的数据库、表、以及表里的数据,并把它们打印出来,目前还不太清楚。
以下是Hive中的表,
 
Idea中查询出来的库与表信息,不完整
 
Idea中查询表中数据,报错
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 任务提交问题

2020-09-18 文章 guaishushu1...@163.com
CliFrontend 向yarn上提交任务会因为资源不足等原因,导致任务提交进程一直卡着,直到有资源释放为止?



guaishushu1...@163.com


退订

2020-09-18 文章 联通集团联通支付有限公司总部

退订

如果您错误接收了该邮件,请通过电子邮件立即通知我们。请回复邮件到 
hqs-s...@chinaunicom.cn,即可以退订此邮件。我们将立即将您的信息从我们的发送目录中删除。 If you have received 
this email in error please notify us immediately by e-mail. Please reply to 
hqs-s...@chinaunicom.cn ,you can unsubscribe from this mail. We will 
immediately remove your information from send catalogue of our.


回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-18 文章 chengyanan1...@foxmail.com
先占个楼
我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
最后得到的结果是这样的 :(跟题主不一样)

 wStart  wEndpv 
   uv
  2020-09-18T09:14  2020-09-18T09:16 2  
   2
  2020-09-18T09:16  2020-09-18T09:18 8  
   3
  2020-09-18T09:18  2020-09-18T09:20 8  
   3
  2020-09-18T09:20  2020-09-18T09:22 2  
   2

等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
wStartwEnd   pv 
   uv
2020-09-18T09:14  2020-09-18T09:16  2   
  2
2020-09-18T09:16  2020-09-18T09:18  2   
  2
2020-09-18T09:18  2020-09-18T09:20  8   
  3
2020-09-18T09:20  2020-09-18T09:22  2   
  2



 
发件人: anonnius
发送时间: 2020-09-18 11:24
收件人: user-zh
主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
6> 第一次结果(这里sql-cli的sql一直在运行)
wStart  wEndpv  
  uv
2020-09-18T09:14  2020-09-18T09:16  5   
  3
2020-09-18T09:16  2020-09-18T09:18  8   
  3
2020-09-18T09:18  2020-09-18T09:20  8   
  3
2020-09-18T09:20  2020-09-18T09:22  2   
  2
7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
wStartwEnd   pv 
   uv
2020-09-18T09:14  2020-09-18T09:16  2   
  2
2020-09-18T09:16  2020-09-18T09:18  2   
  2
2020-09-18T09:18  2020-09-18T09:20  8   

Re: Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 文章 Jingsong Li
Hi,

不好意思,麻烦试下
试下最新的release-1.11分支编译出来的Hive依赖 (flink-connector-hive的 改动)

> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?

这是1.12的目标,这两天会出来JIRA和设计方案,类似会加上"auto-compaction"的配置,sink中自动合并

Best,
Jingsong


On Fri, Sep 18, 2020 at 10:18 AM kandy.wang  wrote:

>
>
>
>
>
>
> @Jingsong Li
>   测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
> 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter
> 修改应该都提交到flink 1.11分支了吧。
> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?
>
>
> 在 2020-09-17 14:19:42,"Jingsong Li"  写道:
> >是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> >
> >> 为何要强制滚动文件
> >
> >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
> >
> >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >> ok. 就是用hadoop mr writer vs  flink 自实现的native
> >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> >> 改成false是可以满足我们的写hive需求了
> >> 还有一个问题,之前问过你,你还没回复:
> >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true
> 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> >>
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> >> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >> >可以再尝试下最新的1.11.2吗?
> >> >
> >> >https://flink.apache.org/downloads.html
> >> >
> >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >> >
> >> >> 是master分支代码
> >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> >> if (userMrWriter) {
> >> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner,
> >> >> rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> >> } else {
> >> >>Optional> bulkFactory =
> >> >> createBulkWriterFactory(partitionColumns, sd);
> >> >>if (bulkFactory.isPresent()) {
> >> >>   builder = StreamingFileSink.forBulkFormat(
> >> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> >> new
> >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> >> partComputer))
> >> >> .withBucketAssigner(assigner)
> >> >> .withRollingPolicy(rollingPolicy)
> >> >> .withOutputFileConfig(outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> >> } else {
> >> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> >> assigner, rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer
> because
> >> >> BulkWriter Factory not available.");
> >> >> }
> >> >> }
> >> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >> >是最新的代码吗?
> >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >> >
> >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >> @Jingsong Li
> >> >> >>
> >> >> >> public TableSink createTableSink(TableSinkFactory.Context
> context) {
> >> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >> >>
> >> >> >>boolean isGeneric =
> >> >> >>
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >> >>
> >> >> >>if (!isGeneric) {
> >> >> >> return new HiveTableSink(
> >> >> >> context.getConfiguration().get(
> >> >> >>
> >>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> >> context.isBounded(),
> >> >> >> new JobConf(hiveConf),
> >> >> >> context.getObjectIdentifier(),
> >> >> >> table);
> >> >> >> } else {
> >> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> >> }
> >> >> >> }
> >> >> >>
> >> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >> >>
> >> >> >> If it is false, using flink native writer to write parquet and orc
> >> >> files;
> >> >> >>
> >> >> >> If it is true, using hadoop mapred record writer to write parquet
> and
> >> >> orc
> >> >> >> files
> >> >> >>
> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >> >>
> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async
> flush
> >> >> >> 一些相关的参数 ?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >> >> >Sink并行度
> >> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >> >
> >> >> >> >HDFS性能
> >> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >> >
> >> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
> >> wrote:
> >> >> >> >
> >> >> >> >> 场景很简单,就是kafka2hive
> >> >> >> >> --5min入仓Hive
> >> >> >> >>
> >> >> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >> >> >>
> >> >> >> >> SELECT
> >> >> >> >>
> >> >> >> >>  arg_service,
> >> >> >> >>
> >> >> >> >> time_local
> >> >> >> >>
> >> >> >> >> .
> >> >> >> >>
> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300)