flink connect kafka

2020-09-17 文章 wch...@163.com
flink版本 1.10.0  没有使用checkpoint
Kafka version : 0.10.2.1
数据源为kafka
代码如下:
val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, 
HqKafkaTopic.KAFKA_TOPIC_HK_INDEX)
val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] =
  new FlinkKafkaConsumer(topicHkList, new 
CustomKafkaDeserializationSchema(), properties)

// 配置 Kafka Consumer 开始消费的位置
kafkaHkConsumer.setStartFromLatest()
val sourseHk = env
  .addSource(kafkaHkConsumer).name("hk kafka source")
  .map(new HkKafkaDecodeMap)
  .map(new HkKafkaObj2HqMap)
  .map(new HkMsgPushDecodeMap)
  .filter(new HkMsgPushFilter)

消费数据的时候, 发现数据出不来, 打印debug日志:
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 
12716919
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 
12717048
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 
12717071
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)

好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗?




wch...@163.com


Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 xushanshan
hi, Jark
 开启 minibatch
是将中间数据按一批次处理,如果中间回撤数据和后续的更新数据分到两个minibatch里了,还是不能避免下游系统查询到中间结果的问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 Jark Wu
开启 minibatch 可以基本解决中间结果的问题:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

Best,
Jark

On Fri, 18 Sep 2020 at 11:57, xushanshan <1337220...@qq.com> wrote:

> 问题内容已修改补充完成
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 xushanshan
问题内容已修改补充完成



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-17 文章 anonnius
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
6> 第一次结果(这里sql-cli的sql一直在运行)

wStart  wEndpv  
  uv

2020-09-18T09:14  2020-09-18T09:16  5   
  3

2020-09-18T09:16  2020-09-18T09:18  8   
  3

2020-09-18T09:18  2020-09-18T09:20  8   
  3

2020-09-18T09:20  2020-09-18T09:22  2   
  2

7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)

wStartwEnd   pv 
   uv
2020-09-18T09:14  2020-09-18T09:16  2   
  2
2020-09-18T09:16  2020-09-18T09:18  2   
  2
2020-09-18T09:18  2020-09-18T09:20  8   
  3
2020-09-18T09:20  2020-09-18T09:22  2   
  2
8> 详细过程以放入附件文件中





1. sql-client.sh中 建表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
clientTime string comment '10位时间戳',
rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, 
'0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 
作为watermark
) with (
'connector' = 'kafka-0.10',
'topic' = 'message-json',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'consumer-rt',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)

2. sql-client.sh中 运行
select  
tumble_start(rowtime, interval '2' MINUTE) as wStart,
tumble_end(rowtime, interval '2' MINUTE) as wEnd,
count(1) as pv,
count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

3. kafka 生产者依次写入消息


kafka记录 
clientTime消息时间
产生的watermark时间   说明
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}2020-09-18 
09:14:44   2020-09-18 09:13:44   

????stream??????????????????????????

2020-09-17 文章 ??????
  
??kafka??




//kafka
DataStreamSource

ListState 设置 TTL 会在 list 中删除之前的记录吗

2020-09-17 文章 wangl...@geekplus.com
考虑下面的场景:

KeyBy userId, 把该 userId 所用的相关记录存起来放在 ListState 中
private transient ListState list;

@Override
public void processElement(Tuple2 value, Context ctx, 
Collector out)
throws Exception {
list.add(value.f1);
}
TTL 设为 7 天。
如果这个 userId 超过 7 天没有任何消息,那这个 userId 相应的 ListState 会被删除。
但如果这 userId 一直持续不断的有消息过来,那 7 天之前 被 add 到 list 的记录会不会被删除呢?

谢谢,
王磊


wangl...@geekplus.com


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 jun su
hi godfrey,

我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题

godfrey he  于2020年9月17日周四 下午10:07写道:

> TableEnvironment 不是多线程安全的。
>
> btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
>
> Jeff Zhang  于2020年9月14日周一 下午12:10写道:
>
> > 参考zeppelin的做法,每个线程里都调用这个
> >
> >
> >
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
> >
> >
> > jun su  于2020年9月14日周一 上午11:54写道:
> >
> > > hi all,
> > >
> > > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> > >
> > > Caused by: java.lang.NullPointerException
> > >   at java.util.Objects.requireNonNull(Objects.java:203)
> > >   at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >   at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >
> > >
> > >
> > >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > > 解决
> > >
> > >
> > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> > >
> > > Caused by: java.lang.NullPointerException
> > > at scala.Predef$.Double2double(Predef.scala:365)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > > at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > > at
> > >
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > > Source)
> > > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > > Source)
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best,
Jun Su


Re: Flink 1.11 jdbc查pg失败

2020-09-17 文章 Jark Wu
能贴下你的 DDL 和 query 吗?

你可以试试用反引号,  select `F1`, `F2` from xxx;

Best,
Jark

On Thu, 17 Sep 2020 at 23:28, godfrey he  wrote:

> 据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解
>
> wdmcode  于2020年9月10日周四 上午9:44写道:
>
> > Hi Jimmy
> >
> > 给字段加双引号试试呢
> > Select “F1”,”F2” from xxx.xxx;
> >
> >
> > 发件人: Jimmy Zhang
> > 发送时间: Thursday, September 10, 2020 9:41 AM
> > 收件人: user-zh@flink.apache.org
> > 主题: Flink 1.11 jdbc查pg失败
> >
> > flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗
> >
> > Best,
> > Jimmy Signature is customized by Netease Mail Master
> >
> >
>


Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 kandy.wang






@Jingsong Li
  测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 
修改应该都提交到flink 1.11分支了吧。
顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?


在 2020-09-17 14:19:42,"Jingsong Li"  写道:
>是的,可以测一下,理论上 mr writer不应该有较大性能差距。
>
>> 为何要强制滚动文件
>
>因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
>
>On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:
>
>>
>>
>>
>> ok. 就是用hadoop mr writer vs  flink 自实现的native
>> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
>> 改成false是可以满足我们的写hive需求了
>> 还有一个问题,之前问过你,你还没回复:
>> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
>> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
>> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
>> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
>> >可以再尝试下最新的1.11.2吗?
>> >
>> >https://flink.apache.org/downloads.html
>> >
>> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
>> >
>> >> 是master分支代码
>> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> >> if (userMrWriter) {
>> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner,
>> >> rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> >> } else {
>> >>Optional> bulkFactory =
>> >> createBulkWriterFactory(partitionColumns, sd);
>> >>if (bulkFactory.isPresent()) {
>> >>   builder = StreamingFileSink.forBulkFormat(
>> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
>> >> new
>> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
>> partComputer))
>> >> .withBucketAssigner(assigner)
>> >> .withRollingPolicy(rollingPolicy)
>> >> .withOutputFileConfig(outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use native parquet writer.");
>> >> } else {
>> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> >> assigner, rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> >> BulkWriter Factory not available.");
>> >> }
>> >> }
>> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>> >> >是最新的代码吗?
>> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >> >
>> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>> >> >
>> >> >> @Jingsong Li
>> >> >>
>> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >> >>CatalogTable table = checkNotNull(context.getTable());
>> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >> >>
>> >> >>boolean isGeneric =
>> >> >>
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >> >>
>> >> >>if (!isGeneric) {
>> >> >> return new HiveTableSink(
>> >> >> context.getConfiguration().get(
>> >> >>
>>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> >> context.isBounded(),
>> >> >> new JobConf(hiveConf),
>> >> >> context.getObjectIdentifier(),
>> >> >> table);
>> >> >> } else {
>> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> >> }
>> >> >> }
>> >> >>
>> >> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >> >>
>> >> >> If it is false, using flink native writer to write parquet and orc
>> >> files;
>> >> >>
>> >> >> If it is true, using hadoop mapred record writer to write parquet and
>> >> orc
>> >> >> files
>> >> >>
>> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >> >>
>> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> >> 一些相关的参数 ?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>> >> >> >Sink并行度
>> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >> >
>> >> >> >HDFS性能
>> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >> >
>> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
>> wrote:
>> >> >> >
>> >> >> >> 场景很简单,就是kafka2hive
>> >> >> >> --5min入仓Hive
>> >> >> >>
>> >> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >> >>
>> >> >> >> SELECT
>> >> >> >>
>> >> >> >>  arg_service,
>> >> >> >>
>> >> >> >> time_local
>> >> >> >>
>> >> >> >> .
>> >> >> >>
>> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >> >> >>
>> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
>> >> properties.group.id
>> >> >> '='kafka_hive_test',
>> >> >> >> 'scan.startup.mode'='earliest-offset') */;
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --kafka source表定义
>> >> >> >>
>> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >> >> >>
>> >> >> >> arg_service string COMMENT 'arg_service',
>> >> >> >>
>> >> >> >> 
>> >> >> >>
>> >> >> >> )WITH 

Re: Flink 1.11 jdbc查pg失败

2020-09-17 文章 godfrey he
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解

wdmcode  于2020年9月10日周四 上午9:44写道:

> Hi Jimmy
>
> 给字段加双引号试试呢
> Select “F1”,”F2” from xxx.xxx;
>
>
> 发件人: Jimmy Zhang
> 发送时间: Thursday, September 10, 2020 9:41 AM
> 收件人: user-zh@flink.apache.org
> 主题: Flink 1.11 jdbc查pg失败
>
> flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗
>
> Best,
> Jimmy Signature is customized by Netease Mail Master
>
>


Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-17 文章 godfrey he
能提供完整的demo吗?

me  于2020年9月11日周五 下午6:54写道:

> 1.flink 版本是1.11.1
> streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamBlinkSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> streamTableEnv = StreamTableEnvironment.create(streamEnv,
> streamBlinkSettings)
>
> 2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”)
> sql的结果Table会转为datastream然后addSink保存到kafka中。
>
>
>  原始邮件
> 发件人: silence
> 收件人: user-zh
> 发送时间: 2020年9月11日(周五) 18:49
> 主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming
> topology. Cannot execute.
>
>
> 没有insert语句也就是没有sink无法触发计算 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 godfrey he
TableEnvironment 不是多线程安全的。

btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?

Jeff Zhang  于2020年9月14日周一 下午12:10写道:

> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su  于2020年9月14日周一 上午11:54写道:
>
> > hi all,
> >
> > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> >
> > Caused by: java.lang.NullPointerException
> >   at java.util.Objects.requireNonNull(Objects.java:203)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >
> >
> >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > 解决
> >
> >
> > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> >
> > Caused by: java.lang.NullPointerException
> > at scala.Predef$.Double2double(Predef.scala:365)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > at
> >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > Source)
> > at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > Source)
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: flink hive批量作业报FileNotFoundException

2020-09-17 文章 godfrey he
cc @Rui Li 

李佳宸  于2020年9月14日周一 下午5:11写道:

> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does not exist.
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
> at
>
> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>


Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-17 文章 godfrey he
sql
client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置
sql-client-defaults.yaml的parallelism

Jark Wu  于2020年9月15日周二 上午11:43写道:

> Hi,
>
> 请问
> 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
> 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。
>
> Best,
> Jark
>
> On Fri, 11 Sep 2020 at 17:56, 引领  wrote:
>
> >
> >
> > 1、在checkpoint后,用ck恢复时报错。
> > org.apache.kafka.connect.errors.ConnectException:
> >
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
> > Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000,
> > eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19,
> dataLength=25879,
> > nextPosition=721073164, flags=0}
> > 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1
> > 我已在配置文件中做了相应的设置,包括sql-client中
> > taskmanager.numberOfTaskSlots: 5 # The parallelism used for
> > programs that did not specify and other parallelism.
> >  parallelism.default: 5
> >
> >
> > 我的sql是:
> >
> >
> > Insert into orders Select * from order o join sku s FOR SYSTEM_TIME
> as
> > of o.proc_time s  on o.sku_id = s.id
> >
> >
> > 提前感谢各位大佬回复
> >
> >
> >
> >
> >
> >
>


Re: Flink SQL create view问题

2020-09-17 文章 godfrey he
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750

guaishushu1...@163.com  于2020年9月16日周三 下午2:32写道:

> 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常
>
> 语法:
> CREATE TABLE billing_data_test (
> message  STRING
>
>
> create view v1 as
> select T.*
> from billing_data_test,
> LATERAL TABLE(SplitUdtf(message)) as T(scate1,  scate2,  belong_local1,
> ssrc2,  gift,  coupon,  local_type);
>
> 异常:
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'message' not found in any table
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 14:32:04,857] INFO ---  at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> (com.dataplatform.flink.util.FlinkDebugThread)
>
>
>
>
>
> guaishushu1...@163.com
>


Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 godfrey he
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse),
例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。
你可以通过explain的方式把plan打印出来看看,source的digest是否一样

Jingsong Li  于2020年9月17日周四 下午2:45写道:

> 你仔细看看这两个数据源是不是有什么不同
> 只要有一点不同,Blink 就 reuse 不了
>
> On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:
>
> > 场景描述:
> > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> > 问题描述:
> > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
> >
> > 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> > - table.optimizer.reuse-source-enabled
> > - table.optimizer.reuse-sub-plan-enabled
> >
> > 请问下,有人碰到类似问题么?
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.9 关于回撤流的问题

2020-09-17 文章 godfrey he
可以用flink提供的“去重"语法来支持

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

Shengkai Fang  于2020年9月15日周二 下午4:02写道:

> hi, 我对于使用upsert
>
> kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗?
>
> star <3149768...@qq.com> 于2020年6月8日周一 上午9:38写道:
>
> > 非常感谢,正是我想要的。也谢谢金竹老师的分享!
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Sun.Zhu"<17626017...@163.com;
> > 发送时间:2020年6月7日(星期天) 凌晨0:02
> > 收件人:"user-zh@flink.apache.org" > 抄送:"user-zh@flink.apache.org" > 主题:回复:flink 1.9 关于回撤流的问题
> >
> >
> >
> > Hi,star
> > 金竹老师发过一篇文章,重写了KafkaConnector的实现,支持upsert模式,可以参考下[1]
> >
> >
> > [1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月3日 14:47,star<3149768...@qq.com 写道:
> > 大家好,
> >
> >
> >
> >
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
> > 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
> >
> >
> >
> >
> > 谢谢
>


Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-17 文章 Jark Wu
>  sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库

这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280

Best,
Jark

On Thu, 17 Sep 2020 at 18:15, chenxuying  wrote:

> 环境是flink1.11.2+idea
> sql:
> CREATE TABLE sourceTable (
> platform STRING
> ,game_id bigint
> ) WITH (
> ...
> );
> CREATE TABLE sinktable (
> platform STRING
> ,game_id bigint
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = '',
> 'table-name' = '',
> 'driver' = 'com.mysql.jdbc.Driver',
> 'username' = '',
> 'password' = '',
> 'sink.buffer-flush.max-rows' = '2',
> 'sink.buffer-flush.interval' = '30s'
> );
> insert into sinktable select platform,game_id from sourceTable;
>
>
> 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval
> 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行
> 如果设置如下
>sink.buffer-flush.max-rows = '0'
>'sink.buffer-flush.interval' = '60s'
> 导致每接收一条数据就插入数据库
> 如果设置如下
>sink.buffer-flush.max-rows = '10'
>'sink.buffer-flush.interval' = '0'
> 导致无法插入数据库
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
>
>


Re: K8s native 部署失败

2020-09-17 文章 Yang Wang
从你发的报错栈来看TM是用的ip地址去连的,正常如果是非HA的话,应该是通过service来连接的
因为JM在非HA情况下rpc地址是bind到service上的

你是否有对Flink的代码做修改呢,或者用native模式起来以后,修改过ConfigMap等

Best,
Yang

yanzhibo  于2020年9月17日周四 下午3:55写道:

> 是非ha,所有tm都注册不上来,但是在tm的pod中  根据service 是可以ping 通 jobmanager的
>
>
> > 2020年9月17日 上午11:10,Yang Wang  写道:
> >
> > 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢
> >
> > 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
> > 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常
> >
> > 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题
> >
> >
> > Best,
> > Yang
> >
> > yanzhibo  于2020年9月16日周三 下午5:25写道:
> >
> >> 一个job manager pod 提交job后,申请taskmanager失败
> >>
> >>
> >> Taskmanager 的异常
> >>
> >> Fatal error occurred in TaskExecutor akka.tcp://
> >> flink@179.10.251.70:6122/user/rpc/taskmanager_0.
> >>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> >> Could not register at the ResourceManager within the specified maximum
> >> registration duration 30 ms. This indicates a problem with this
> >> instance. Terminating now.
> >>at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.actor.Actor.aroundReceive(Actor.scala:517)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >> 2020-09-16 09:14:39,077 ERROR
> >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
> >> error occurred while executing the TaskManager. Shutting it down...
> >>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> >> Could not register at the ResourceManager within the specified maximum
> >> registration duration 30 ms. This indicates a problem with this
> >> instance. Terminating now.
> >>at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
> >> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>at
> >>
> 

使用flinksql时 jdbc connector参数不起作用

2020-09-17 文章 chenxuying
环境是flink1.11.2+idea
sql:
CREATE TABLE sourceTable (
platform STRING
,game_id bigint
) WITH (
...
);
CREATE TABLE sinktable (
platform STRING
,game_id bigint
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'driver' = 'com.mysql.jdbc.Driver',
'username' = '',
'password' = '',
'sink.buffer-flush.max-rows' = '2',
'sink.buffer-flush.interval' = '30s'
);
insert into sinktable select platform,game_id from sourceTable;


官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval 这两个属性可以设置成 
'0' 来禁用他 , 不过我试了下是不行
如果设置如下
   sink.buffer-flush.max-rows = '0'
   'sink.buffer-flush.interval' = '60s'
导致每接收一条数据就插入数据库
如果设置如下
   sink.buffer-flush.max-rows = '10'
   'sink.buffer-flush.interval' = '0'
导致无法插入数据库


[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options



Re: 关于官方的k8s operator

2020-09-17 文章 Harold.Miao
谢谢

Yang Wang  于2020年9月17日周四 上午11:20写道:

> Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个
> 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成
>
> 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4]
>
>
> [1]. https://github.com/lyft/flinkk8soperator
> [2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [3].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
> [4]. https://github.com/wangyang0918/flink-native-k8s-operator
>
>
> Best,
> Yang
>
> Harold.Miao  于2020年9月17日周四 上午10:14写道:
>
> > hi flink
> >
> > 请教一下官方在支持k8s operator部署这块有什么计划吗?
> >
> > 谢谢
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


关于checkpointFunction接口

2020-09-17 文章 smq
大佬好,现在有个疑问,因为要用到自定义的state
保存某个值,这个状态是在keydprocessFunction实现类中用到的,我在sink的时候,实现了CheckpointFunction接口,实现了snapshotState方法,在这个方法中,我只写了一个flush到kudu的方法。这样的话我之前自己定义的state能保存到状态后端吗。
换句话说,就是snapshotState这个方法是在做checkpoint的同时调用了这个方法中的动作,还是说其他的状态不做了,只做我实现的snapshotState这个方法呢?

Re: 请问flink jdbc connector 支持greenplum吗

2020-09-17 文章 Jark Wu
官方的暂不支持。 需要自己开发JdbcDialect插件。

On Thu, 17 Sep 2020 at 13:59, xuzh  wrote:

> 请问flink jdbc connector 支持greenplum吗,还是要另外自己写插件


Per-job mode 任务失败 jm没有退出

2020-09-17 文章 Qishang
Flink 1.11.1
CDH 5.15.2
提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm
2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive
/tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar

flink-conf.yaml 重启策策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
restart-strategy.fixed-delay.delay: 10 s

我在测试失败重启策略,发现任务失败之后会在重试次数之后,Task停止。Web UI 显示在Completed
Jobs里面,jm没有挂掉,看yarn上面任务在Runing状态。占用的资源是只有jm的资源了。

1. per-job 任务失败重试次数之后jm不会退出吗,还是我某些参数设置的不对?

是我在flapmap里面手动抛出的异常,报错:
2020-09-17 15:48:47
org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
backoffTimeMS=1)
at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler
.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase
.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)
Caused by: java.lang.Exception: Test failed
at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap(
TidbBinlogSyncHive.java:231)
at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap(
TidbBinlogSyncHive.java:178)
at org.apache.flink.streaming.api.operators.StreamFlatMap
.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)


Re: K8s native 部署失败

2020-09-17 文章 yanzhibo
是非ha,所有tm都注册不上来,但是在tm的pod中  根据service 是可以ping 通 jobmanager的


> 2020年9月17日 上午11:10,Yang Wang  写道:
> 
> 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢
> 
> 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
> 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常
> 
> 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题
> 
> 
> Best,
> Yang
> 
> yanzhibo  于2020年9月16日周三 下午5:25写道:
> 
>> 一个job manager pod 提交job后,申请taskmanager失败
>> 
>> 
>> Taskmanager 的异常
>> 
>> Fatal error occurred in TaskExecutor akka.tcp://
>> flink@179.10.251.70:6122/user/rpc/taskmanager_0.
>> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>> Could not register at the ResourceManager within the specified maximum
>> registration duration 30 ms. This indicates a problem with this
>> instance. Terminating now.
>>at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.12-1.11.1.jar:1.11.1]
>> 2020-09-16 09:14:39,077 ERROR
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
>> error occurred while executing the TaskManager. Shutting it down...
>> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>> Could not register at the ResourceManager within the specified maximum
>> registration duration 30 ms. This indicates a problem with this
>> instance. Terminating now.
>>at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~
>> 
>> 
>> Jobmanger 异常
>> 
>> 0d5f8478a2ab4e17d816810752f669eb) switched from SCHEDULED to FAILED on not
>> deployed.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate the required slot within slot request timeout. Please
>> make sure that the cluster has enough 

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
感谢搞定了,根据你提供的文档,我把命令改为

flink run -py src/etl/hello_world.py -pyexec
/usr/local/opt/python@3.7/bin/python3

指定了 python 执行器就行了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: python udf 提交到本地节点执行报错

2020-09-17 文章 Xingbo Huang
Hi,

你可以参考文档[1],里面的api
set_python_executable(python_exec)用来设置你的Python环境的,然后你需要确保你这个python环境有安装pyflink。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#python-dependency

Best,
Xingbo

myfjdthink  于2020年9月17日周四 下午3:13写道:

> 你好,我的本地集群是单点的,直接使用文档里的
> bin/start-cluster.sh
> 命令启动的。
>
> 我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
你好,我的本地集群是单点的,直接使用文档里的
bin/start-cluster.sh
命令启动的。

我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 Xingbo Huang
Hi,

你可以看到报错信息里面有这么一条:
ImportError: No module named pyflink

看起来是你的集群环境使用的python环境没有安装pyflink

Best,
Xingbo

myfjdthink  于2020年9月17日周四 下午2:50写道:

> 操作系统
>
> Mac OS
>
> flink --version
>
> Version: 1.11.1, Commit ID: 7eb514a
>
>
> 代码
>
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
> # 1. create a TableEnvironment
> env_settings =
>
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env =
> StreamTableEnvironment.create(environment_settings=env_settings)
>
> # 2. create source Table
> table_env.execute_sql("""
> CREATE TABLE datagen (
> id BIGINT,
> data STRING
> ) WITH (
> 'connector' = 'datagen',
> 'fields.id.kind' = 'sequence',
> 'fields.id.start' = '1',
> 'fields.id.end' = '20'
> )
> """)
>
> # 3. create sink Table
> table_env.execute_sql("""
> CREATE TABLE print (
> id BIGINT,
> data STRING
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT(), udf_type="pandas")
> def add(i, j):
> return i + j
>
>
>
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
> table_env.register_function("add", add)
> table_env.execute_sql("""INSERT INTO print
> SELECT add(id, 1), data FROM datagen
> """).get_job_client().get_job_execution_result().result()
>
>
> 执行执行 py 文件是可以正常运行的
>
> 用以下命令提交到 flink 上会报错
>
> flink run -py src/etl/hello_world.py
>
> 报错信息
>
>
> flink run -py src/etl/hello_world.py
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/nick/flink-1.11.1/lib/flink-dist_2.12-1.11.1.jar) to field
> java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Job has been submitted with JobID f38ed31397d5bd6af813bd3048d49048
> Traceback (most recent call last):
>   File "src/etl/hello_world.py", line 41, in 
> """).get_job_client().get_job_execution_result().result()
>   File
>
> "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/common/completable_future.py",
> line 78, in result
>   File
>
> "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
>
> "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
>
> "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o71.get.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: f38ed31397d5bd6af813bd3048d49048)
> at
>
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
>
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:564)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
>
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: f38ed31397d5bd6af813bd3048d49048)
> at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
>
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>
> 

Re: pyflink连接器支持问题

2020-09-17 文章 Xingbo Huang
Hi,

现在flink没有提供官方的IBM
MQ的connector实现,现在已经支持的connector类型,你可以参考文档[1]。如果你需要支持其他connector,你需要提供自定义的connector的java实现,然后在你的python作业里面通过api或者命令行参数的方式把connector的Jar包添加进去,具体可以参考文档[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files

Best,
Xingbo

whh_960101  于2020年9月17日周四 下午2:46写道:

> 您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!


python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
操作系统 

Mac OS

flink --version 

Version: 1.11.1, Commit ID: 7eb514a


代码

from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf

# 1. create a TableEnvironment
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

# 2. create source Table
table_env.execute_sql("""
CREATE TABLE datagen (
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '20'
)
""")

# 3. create sink Table
table_env.execute_sql("""
CREATE TABLE print (
id BIGINT,
data STRING
) WITH (
'connector' = 'print'
)
""")

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
return i + j


table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
table_env.register_function("add", add)
table_env.execute_sql("""INSERT INTO print
SELECT add(id, 1), data FROM datagen
""").get_job_client().get_job_execution_result().result()


执行执行 py 文件是可以正常运行的

用以下命令提交到 flink 上会报错

flink run -py src/etl/hello_world.py

报错信息


flink run -py src/etl/hello_world.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/Users/nick/flink-1.11.1/lib/flink-dist_2.12-1.11.1.jar) to field
java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID f38ed31397d5bd6af813bd3048d49048
Traceback (most recent call last):
  File "src/etl/hello_world.py", line 41, in 
""").get_job_client().get_job_execution_result().result()
  File
"/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/common/completable_future.py",
line 78, in result
  File
"/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o71.get.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f38ed31397d5bd6af813bd3048d49048)
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f38ed31397d5bd6af813bd3048d49048)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at

pyflink连接器支持问题

2020-09-17 文章 whh_960101
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!

Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 Jingsong Li
你仔细看看这两个数据源是不是有什么不同
只要有一点不同,Blink 就 reuse 不了

On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:

> 场景描述:
> 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> 问题描述:
> Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
>
> 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> - table.optimizer.reuse-source-enabled
> - table.optimizer.reuse-sub-plan-enabled
>
> 请问下,有人碰到类似问题么?
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best, Jingsong Lee


Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 Jingsong Li
是的,可以测一下,理论上 mr writer不应该有较大性能差距。

> 为何要强制滚动文件

因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。

On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:

>
>
>
> ok. 就是用hadoop mr writer vs  flink 自实现的native
> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> 改成false是可以满足我们的写hive需求了
> 还有一个问题,之前问过你,你还没回复:
> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
> >可以再尝试下最新的1.11.2吗?
> >
> >https://flink.apache.org/downloads.html
> >
> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
> >
> >> 是master分支代码
> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> if (userMrWriter) {
> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner,
> >> rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> } else {
> >>Optional> bulkFactory =
> >> createBulkWriterFactory(partitionColumns, sd);
> >>if (bulkFactory.isPresent()) {
> >>   builder = StreamingFileSink.forBulkFormat(
> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> new
> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> partComputer))
> >> .withBucketAssigner(assigner)
> >> .withRollingPolicy(rollingPolicy)
> >> .withOutputFileConfig(outputFileConfig);
> >> LOG.info("Hive streaming sink: Use native parquet writer.");
> >> } else {
> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner, rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> >> BulkWriter Factory not available.");
> >> }
> >> }
> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >> >是最新的代码吗?
> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >
> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >> >
> >> >> @Jingsong Li
> >> >>
> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >> >>CatalogTable table = checkNotNull(context.getTable());
> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >>
> >> >>boolean isGeneric =
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >>
> >> >>if (!isGeneric) {
> >> >> return new HiveTableSink(
> >> >> context.getConfiguration().get(
> >> >>
>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> context.isBounded(),
> >> >> new JobConf(hiveConf),
> >> >> context.getObjectIdentifier(),
> >> >> table);
> >> >> } else {
> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> }
> >> >> }
> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >>
> >> >> If it is false, using flink native writer to write parquet and orc
> >> files;
> >> >>
> >> >> If it is true, using hadoop mapred record writer to write parquet and
> >> orc
> >> >> files
> >> >>
> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >>
> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> >> 一些相关的参数 ?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >> >Sink并行度
> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >
> >> >> >HDFS性能
> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >
> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
> wrote:
> >> >> >
> >> >> >> 场景很简单,就是kafka2hive
> >> >> >> --5min入仓Hive
> >> >> >>
> >> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >> >>
> >> >> >> SELECT
> >> >> >>
> >> >> >>  arg_service,
> >> >> >>
> >> >> >> time_local
> >> >> >>
> >> >> >> .
> >> >> >>
> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >> >>
> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> >> properties.group.id
> >> >> '='kafka_hive_test',
> >> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --kafka source表定义
> >> >> >>
> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >> >>
> >> >> >> arg_service string COMMENT 'arg_service',
> >> >> >>
> >> >> >> 
> >> >> >>
> >> >> >> )WITH (
> >> >> >>
> >> >> >>   'connector' = 'kafka',
> >> >> >>
> >> >> >>   'topic' = '...',
> >> >> >>
> >> >> >>   'properties.bootstrap.servers' = '...',
> >> >> >>
> >> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >> >> >>
> >> >> >>   'scan.startup.mode' = 'group-offsets',
> >> >> >>
> >> >> >>   'format' = 'json',
> >> >> >>
> >> >> >>   'json.fail-on-missing-field' = 'false',
> >> >> >>
> 

Re:Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 kandy.wang



ok. 就是用hadoop mr writer vs  flink 自实现的native 
writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 
改成false是可以满足我们的写hive需求了
还有一个问题,之前问过你,你还没回复:
HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 
如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 
sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
在 2020-09-17 13:43:04,"Jingsong Li"  写道:
>可以再尝试下最新的1.11.2吗?
>
>https://flink.apache.org/downloads.html
>
>On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
>
>> 是master分支代码
>> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> if (userMrWriter) {
>>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
>> rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> } else {
>>Optional> bulkFactory =
>> createBulkWriterFactory(partitionColumns, sd);
>>if (bulkFactory.isPresent()) {
>>   builder = StreamingFileSink.forBulkFormat(
>> new org.apache.flink.core.fs.Path(sd.getLocation()),
>> new
>> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
>> .withBucketAssigner(assigner)
>> .withRollingPolicy(rollingPolicy)
>> .withOutputFileConfig(outputFileConfig);
>> LOG.info("Hive streaming sink: Use native parquet writer.");
>> } else {
>>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner, rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> BulkWriter Factory not available.");
>> }
>> }
>> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>> >是最新的代码吗?
>> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >
>> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>> >
>> >> @Jingsong Li
>> >>
>> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >>CatalogTable table = checkNotNull(context.getTable());
>> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >>
>> >>boolean isGeneric =
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >>
>> >>if (!isGeneric) {
>> >> return new HiveTableSink(
>> >> context.getConfiguration().get(
>> >>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> context.isBounded(),
>> >> new JobConf(hiveConf),
>> >> context.getObjectIdentifier(),
>> >> table);
>> >> } else {
>> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> }
>> >> }
>> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >>
>> >> If it is false, using flink native writer to write parquet and orc
>> files;
>> >>
>> >> If it is true, using hadoop mapred record writer to write parquet and
>> orc
>> >> files
>> >>
>> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >>
>> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> 一些相关的参数 ?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>> >> >Sink并行度
>> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >
>> >> >HDFS性能
>> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >
>> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>> >> >
>> >> >> 场景很简单,就是kafka2hive
>> >> >> --5min入仓Hive
>> >> >>
>> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >>
>> >> >> SELECT
>> >> >>
>> >> >>  arg_service,
>> >> >>
>> >> >> time_local
>> >> >>
>> >> >> .
>> >> >>
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >> >>
>> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
>> properties.group.id
>> >> '='kafka_hive_test',
>> >> >> 'scan.startup.mode'='earliest-offset') */;
>> >> >>
>> >> >>
>> >> >>
>> >> >> --kafka source表定义
>> >> >>
>> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >> >>
>> >> >> arg_service string COMMENT 'arg_service',
>> >> >>
>> >> >> 
>> >> >>
>> >> >> )WITH (
>> >> >>
>> >> >>   'connector' = 'kafka',
>> >> >>
>> >> >>   'topic' = '...',
>> >> >>
>> >> >>   'properties.bootstrap.servers' = '...',
>> >> >>
>> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >> >>
>> >> >>   'scan.startup.mode' = 'group-offsets',
>> >> >>
>> >> >>   'format' = 'json',
>> >> >>
>> >> >>   'json.fail-on-missing-field' = 'false',
>> >> >>
>> >> >>   'json.ignore-parse-errors' = 'true'
>> >> >>
>> >> >> );
>> >> >> --sink hive表定义
>> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> >> 
>> >> >> )
>> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>> >> >>   'sink.partition-commit.trigger'='process-time',
>> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >>