flink connect kafka
flink版本 1.10.0 没有使用checkpoint Kafka version : 0.10.2.1 数据源为kafka 代码如下: val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, HqKafkaTopic.KAFKA_TOPIC_HK_INDEX) val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] = new FlinkKafkaConsumer(topicHkList, new CustomKafkaDeserializationSchema(), properties) // 配置 Kafka Consumer 开始消费的位置 kafkaHkConsumer.setStartFromLatest() val sourseHk = env .addSource(kafkaHkConsumer).name("hk kafka source") .map(new HkKafkaDecodeMap) .map(new HkKafkaObj2HqMap) .map(new HkMsgPushDecodeMap) .filter(new HkMsgPushFilter) 消费数据的时候, 发现数据出不来, 打印debug日志: consumer.internals.Fetcher - Ignoring fetched records for hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 12716919 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 12717048 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 12717071 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) 好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗? wch...@163.com
Re: 统计数据含有中间回撤数据的问题
hi, Jark 开启 minibatch 是将中间数据按一批次处理,如果中间回撤数据和后续的更新数据分到两个minibatch里了,还是不能避免下游系统查询到中间结果的问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 统计数据含有中间回撤数据的问题
开启 minibatch 可以基本解决中间结果的问题: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation Best, Jark On Fri, 18 Sep 2020 at 11:57, xushanshan <1337220...@qq.com> wrote: > 问题内容已修改补充完成 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: 统计数据含有中间回撤数据的问题
问题内容已修改补充完成 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么 0> mac本地环境 1> flink 1.11.1 2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1 3> 使用的是sql-client.sh 环境 4> 先在sql-cli中创建了iservVisit表 create table iservVisit ( type string comment '时间类型', uuid string comment '用户uri', clientTime string comment '10位时间戳', rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型 WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark ) with ( 'connector' = 'kafka-0.10', 'topic' = 'message-json', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'consumer-rt', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ) 然后在sql-cli执行sql select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 5> 向kafka生产者依次发送下面的json消息 {"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} {"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} {"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} {"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} {"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 6> 第一次结果(这里sql-cli的sql一直在运行) wStart wEndpv uv 2020-09-18T09:14 2020-09-18T09:16 5 3 2020-09-18T09:16 2020-09-18T09:18 8 3 2020-09-18T09:18 2020-09-18T09:20 8 3 2020-09-18T09:20 2020-09-18T09:22 2 2 7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行) wStartwEnd pv uv 2020-09-18T09:14 2020-09-18T09:16 2 2 2020-09-18T09:16 2020-09-18T09:18 2 2 2020-09-18T09:18 2020-09-18T09:20 8 3 2020-09-18T09:20 2020-09-18T09:22 2 2 8> 详细过程以放入附件文件中 1. sql-client.shä¸ å»ºè¡¨ create table iservVisit ( type string comment 'æ¶é´ç±»å', uuid string comment 'ç¨æ·uri', clientTime string comment '10ä½æ¶é´æ³', rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计ç®å, 10ä½æ¶é´æ³è½¬ä¸ºtimestampç±»å WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计ç®å, ä½ä¸ºwatermark ) with ( 'connector' = 'kafka-0.10', 'topic' = 'message-json', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'consumer-rt', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ) 2. sql-client.shä¸ è¿è¡ select tumble_start(rowtime, interval '2' MINUTE) as wStart, tumble_end(rowtime, interval '2' MINUTE) as wEnd, count(1) as pv, count(distinct uuid) as uv from iservVisit group by tumble(rowtime, interval '2' MINUTE) 3. kafka ç产è ä¾æ¬¡åå ¥æ¶æ¯ kafkaè®°å½ clientTimeæ¶æ¯æ¶é´ 产ççwatermarkæ¶é´ 说æ {"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}2020-09-18 09:14:44 2020-09-18 09:13:44
????stream??????????????????????????
??kafka?? //kafka DataStreamSource
ListState 设置 TTL 会在 list 中删除之前的记录吗
考虑下面的场景: KeyBy userId, 把该 userId 所用的相关记录存起来放在 ListState 中 private transient ListState list; @Override public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { list.add(value.f1); } TTL 设为 7 天。 如果这个 userId 超过 7 天没有任何消息,那这个 userId 相应的 ListState 会被删除。 但如果这 userId 一直持续不断的有消息过来,那 7 天之前 被 add 到 list 的记录会不会被删除呢? 谢谢, 王磊 wangl...@geekplus.com
Re: 多线程模式下使用Blink TableEnvironment
hi godfrey, 我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env, 再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题 godfrey he 于2020年9月17日周四 下午10:07写道: > TableEnvironment 不是多线程安全的。 > > btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗? > > Jeff Zhang 于2020年9月14日周一 下午12:10写道: > > > 参考zeppelin的做法,每个线程里都调用这个 > > > > > > > https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111 > > > > > > jun su 于2020年9月14日周一 上午11:54写道: > > > > > hi all, > > > > > > 多线程模式下执行sql , 在非聚合sql时报了如下错误: > > > > > > Caused by: java.lang.NullPointerException > > > at java.util.Objects.requireNonNull(Objects.java:203) > > > at > > > > > > > > > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141) > > > at > > > > > > > > > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106) > > > > > > > > > > > > > > > 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE())) > > > 解决 > > > > > > > > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix? > > > > > > Caused by: java.lang.NullPointerException > > > at scala.Predef$.Double2double(Predef.scala:365) > > > at > > > > > > > > > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81) > > > at > > > > > > > > > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174) > > > at > > > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > > > Source) > > > at > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > > > Source) > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > -- Best, Jun Su
Re: Flink 1.11 jdbc查pg失败
能贴下你的 DDL 和 query 吗? 你可以试试用反引号, select `F1`, `F2` from xxx; Best, Jark On Thu, 17 Sep 2020 at 23:28, godfrey he wrote: > 据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解 > > wdmcode 于2020年9月10日周四 上午9:44写道: > > > Hi Jimmy > > > > 给字段加双引号试试呢 > > Select “F1”,”F2” from xxx.xxx; > > > > > > 发件人: Jimmy Zhang > > 发送时间: Thursday, September 10, 2020 9:41 AM > > 收件人: user-zh@flink.apache.org > > 主题: Flink 1.11 jdbc查pg失败 > > > > flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗 > > > > Best, > > Jimmy Signature is customized by Netease Mail Master > > > > >
Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能
@Jingsong Li 测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 修改应该都提交到flink 1.11分支了吧。 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? 在 2020-09-17 14:19:42,"Jingsong Li" 写道: >是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > >> 为何要强制滚动文件 > >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 > >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote: > >> >> >> >> ok. 就是用hadoop mr writer vs flink 自实现的native >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer >> 改成false是可以满足我们的写hive需求了 >> 还有一个问题,之前问过你,你还没回复: >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 >> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 >> 在 2020-09-17 13:43:04,"Jingsong Li" 写道: >> >可以再尝试下最新的1.11.2吗? >> > >> >https://flink.apache.org/downloads.html >> > >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote: >> > >> >> 是master分支代码 >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> >> if (userMrWriter) { >> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, >> >> rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> >> } else { >> >>Optional> bulkFactory = >> >> createBulkWriterFactory(partitionColumns, sd); >> >>if (bulkFactory.isPresent()) { >> >> builder = StreamingFileSink.forBulkFormat( >> >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> >> new >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), >> partComputer)) >> >> .withBucketAssigner(assigner) >> >> .withRollingPolicy(rollingPolicy) >> >> .withOutputFileConfig(outputFileConfig); >> >> LOG.info("Hive streaming sink: Use native parquet writer."); >> >> } else { >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> >> assigner, rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> >> BulkWriter Factory not available."); >> >> } >> >> } >> >> 在 2020-09-17 13:21:40,"Jingsong Li" 写道: >> >> >是最新的代码吗? >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> >> > >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: >> >> > >> >> >> @Jingsong Li >> >> >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >> >>CatalogTable table = checkNotNull(context.getTable()); >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >> >> >>boolean isGeneric = >> >> >> >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >> >> >>if (!isGeneric) { >> >> >> return new HiveTableSink( >> >> >> context.getConfiguration().get( >> >> >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> >> context.isBounded(), >> >> >> new JobConf(hiveConf), >> >> >> context.getObjectIdentifier(), >> >> >> table); >> >> >> } else { >> >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> >> } >> >> >> } >> >> >> >> >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> >> >> If it is false, using flink native writer to write parquet and orc >> >> files; >> >> >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> >> orc >> >> >> files >> >> >> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> >> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> >> >> 一些相关的参数 ? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" 写道: >> >> >> >Sink并行度 >> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> >> >> > >> >> >> >HDFS性能 >> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> >> >> > >> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang >> wrote: >> >> >> > >> >> >> >> 场景很简单,就是kafka2hive >> >> >> >> --5min入仓Hive >> >> >> >> >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> >> >> >> >> SELECT >> >> >> >> >> >> >> >> arg_service, >> >> >> >> >> >> >> >> time_local >> >> >> >> >> >> >> >> . >> >> >> >> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' >> >> properties.group.id >> >> >> '='kafka_hive_test', >> >> >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> )WITH
Re: Flink 1.11 jdbc查pg失败
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解 wdmcode 于2020年9月10日周四 上午9:44写道: > Hi Jimmy > > 给字段加双引号试试呢 > Select “F1”,”F2” from xxx.xxx; > > > 发件人: Jimmy Zhang > 发送时间: Thursday, September 10, 2020 9:41 AM > 收件人: user-zh@flink.apache.org > 主题: Flink 1.11 jdbc查pg失败 > > flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗 > > Best, > Jimmy Signature is customized by Netease Mail Master > >
Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
能提供完整的demo吗? me 于2020年9月11日周五 下午6:54写道: > 1.flink 版本是1.11.1 > streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamBlinkSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > streamTableEnv = StreamTableEnvironment.create(streamEnv, > streamBlinkSettings) > > 2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”) > sql的结果Table会转为datastream然后addSink保存到kafka中。 > > > 原始邮件 > 发件人: silence > 收件人: user-zh > 发送时间: 2020年9月11日(周五) 18:49 > 主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming > topology. Cannot execute. > > > 没有insert语句也就是没有sink无法触发计算 -- Sent from: > http://apache-flink.147419.n8.nabble.com/
Re: 多线程模式下使用Blink TableEnvironment
TableEnvironment 不是多线程安全的。 btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗? Jeff Zhang 于2020年9月14日周一 下午12:10写道: > 参考zeppelin的做法,每个线程里都调用这个 > > > https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111 > > > jun su 于2020年9月14日周一 上午11:54写道: > > > hi all, > > > > 多线程模式下执行sql , 在非聚合sql时报了如下错误: > > > > Caused by: java.lang.NullPointerException > > at java.util.Objects.requireNonNull(Objects.java:203) > > at > > > > > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141) > > at > > > > > org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106) > > > > > > > > > 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE())) > > 解决 > > > > > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix? > > > > Caused by: java.lang.NullPointerException > > at scala.Predef$.Double2double(Predef.scala:365) > > at > > > > > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81) > > at > > > > > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174) > > at > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > > Source) > > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > > Source) > > > > -- > > Best, > > Jun Su > > > > > -- > Best Regards > > Jeff Zhang >
Re: flink hive批量作业报FileNotFoundException
cc @Rui Li 李佳宸 于2020年9月14日周一 下午5:11写道: > 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 > 版本是1.11.1 > Caused by: java.io.FileNotFoundException: File > > hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 > does not exist. > at > > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) > ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0] > at > > org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题 >
Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助
sql client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置 sql-client-defaults.yaml的parallelism Jark Wu 于2020年9月15日周二 上午11:43写道: > Hi, > > 请问 > 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令? > 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。 > > Best, > Jark > > On Fri, 11 Sep 2020 at 17:56, 引领 wrote: > > > > > > > 1、在checkpoint后,用ck恢复时报错。 > > org.apache.kafka.connect.errors.ConnectException: > > > com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: > > Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000, > > eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19, > dataLength=25879, > > nextPosition=721073164, flags=0} > > 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1 > > 我已在配置文件中做了相应的设置,包括sql-client中 > > taskmanager.numberOfTaskSlots: 5 # The parallelism used for > > programs that did not specify and other parallelism. > > parallelism.default: 5 > > > > > > 我的sql是: > > > > > > Insert into orders Select * from order o join sku s FOR SYSTEM_TIME > as > > of o.proc_time s on o.sku_id = s.id > > > > > > 提前感谢各位大佬回复 > > > > > > > > > > > > >
Re: Flink SQL create view问题
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750 guaishushu1...@163.com 于2020年9月16日周三 下午2:32写道: > 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常 > > 语法: > CREATE TABLE billing_data_test ( > message STRING > > > create view v1 as > select T.* > from billing_data_test, > LATERAL TABLE(SplitUdtf(message)) as T(scate1, scate2, belong_local1, > ssrc2, gift, coupon, local_type); > > 异常: > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column > 'message' not found in any table > (com.dataplatform.flink.util.FlinkDebugThread) > [2020-09-16 14:32:04,857] INFO --- at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > (com.dataplatform.flink.util.FlinkDebugThread) > > > > > > guaishushu1...@163.com >
Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse), 例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。 你可以通过explain的方式把plan打印出来看看,source的digest是否一样 Jingsong Li 于2020年9月17日周四 下午2:45写道: > 你仔细看看这两个数据源是不是有什么不同 > 只要有一点不同,Blink 就 reuse 不了 > > On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote: > > > 场景描述: > > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图 > > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis > > 问题描述: > > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源 > > > > 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源 > > - table.optimizer.reuse-source-enabled > > - table.optimizer.reuse-sub-plan-enabled > > > > 请问下,有人碰到类似问题么? > > > > > > > > > > > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > -- > Best, Jingsong Lee >
Re: flink 1.9 关于回撤流的问题
可以用flink提供的“去重"语法来支持 [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D Shengkai Fang 于2020年9月15日周二 下午4:02写道: > hi, 我对于使用upsert > > kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗? > > star <3149768...@qq.com> 于2020年6月8日周一 上午9:38写道: > > > 非常感谢,正是我想要的。也谢谢金竹老师的分享! > > > > > > > > > > --原始邮件-- > > 发件人:"Sun.Zhu"<17626017...@163.com; > > 发送时间:2020年6月7日(星期天) 凌晨0:02 > > 收件人:"user-zh@flink.apache.org" > 抄送:"user-zh@flink.apache.org" > 主题:回复:flink 1.9 关于回撤流的问题 > > > > > > > > Hi,star > > 金竹老师发过一篇文章,重写了KafkaConnector的实现,支持upsert模式,可以参考下[1] > > > > > > [1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA > > | | > > Sun.Zhu > > | > > | > > 17626017...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年06月3日 14:47,star<3149768...@qq.com 写道: > > 大家好, > > > > > > > > > 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了) > > 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析 > > > > > > > > > > 谢谢 >
Re: 使用flinksql时 jdbc connector参数不起作用
> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库 这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280 Best, Jark On Thu, 17 Sep 2020 at 18:15, chenxuying wrote: > 环境是flink1.11.2+idea > sql: > CREATE TABLE sourceTable ( > platform STRING > ,game_id bigint > ) WITH ( > ... > ); > CREATE TABLE sinktable ( > platform STRING > ,game_id bigint > ) WITH ( > 'connector' = 'jdbc', > 'url' = '', > 'table-name' = '', > 'driver' = 'com.mysql.jdbc.Driver', > 'username' = '', > 'password' = '', > 'sink.buffer-flush.max-rows' = '2', > 'sink.buffer-flush.interval' = '30s' > ); > insert into sinktable select platform,game_id from sourceTable; > > > 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval > 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行 > 如果设置如下 >sink.buffer-flush.max-rows = '0' >'sink.buffer-flush.interval' = '60s' > 导致每接收一条数据就插入数据库 > 如果设置如下 >sink.buffer-flush.max-rows = '10' >'sink.buffer-flush.interval' = '0' > 导致无法插入数据库 > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options > >
Re: K8s native 部署失败
从你发的报错栈来看TM是用的ip地址去连的,正常如果是非HA的话,应该是通过service来连接的 因为JM在非HA情况下rpc地址是bind到service上的 你是否有对Flink的代码做修改呢,或者用native模式起来以后,修改过ConfigMap等 Best, Yang yanzhibo 于2020年9月17日周四 下午3:55写道: > 是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的 > > > > 2020年9月17日 上午11:10,Yang Wang 写道: > > > > 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢 > > > > 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的 > > 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常 > > > > 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题 > > > > > > Best, > > Yang > > > > yanzhibo 于2020年9月16日周三 下午5:25写道: > > > >> 一个job manager pod 提交job后,申请taskmanager失败 > >> > >> > >> Taskmanager 的异常 > >> > >> Fatal error occurred in TaskExecutor akka.tcp:// > >> flink@179.10.251.70:6122/user/rpc/taskmanager_0. > >> > org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: > >> Could not register at the ResourceManager within the specified maximum > >> registration duration 30 ms. This indicates a problem with this > >> instance. Terminating now. > >>at > >> > org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.actor.Actor.aroundReceive(Actor.scala:517) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.actor.Actor.aroundReceive$(Actor.scala:515) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.actor.ActorCell.invoke(ActorCell.scala:561) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.dispatch.Mailbox.run(Mailbox.scala:225) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> [flink-dist_2.12-1.11.1.jar:1.11.1] > >> 2020-09-16 09:14:39,077 ERROR > >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal > >> error occurred while executing the TaskManager. Shutting it down... > >> > org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: > >> Could not register at the ResourceManager within the specified maximum > >> registration duration 30 ms. This indicates a problem with this > >> instance. Terminating now. > >>at > >> > org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237) > >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] > >>at > >> >
使用flinksql时 jdbc connector参数不起作用
环境是flink1.11.2+idea sql: CREATE TABLE sourceTable ( platform STRING ,game_id bigint ) WITH ( ... ); CREATE TABLE sinktable ( platform STRING ,game_id bigint ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = 'com.mysql.jdbc.Driver', 'username' = '', 'password' = '', 'sink.buffer-flush.max-rows' = '2', 'sink.buffer-flush.interval' = '30s' ); insert into sinktable select platform,game_id from sourceTable; 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行 如果设置如下 sink.buffer-flush.max-rows = '0' 'sink.buffer-flush.interval' = '60s' 导致每接收一条数据就插入数据库 如果设置如下 sink.buffer-flush.max-rows = '10' 'sink.buffer-flush.interval' = '0' 导致无法插入数据库 [1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
Re: 关于官方的k8s operator
谢谢 Yang Wang 于2020年9月17日周四 上午11:20写道: > Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个 > 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成 > > 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4] > > > [1]. https://github.com/lyft/flinkk8soperator > [2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > [3]. > > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html > [4]. https://github.com/wangyang0918/flink-native-k8s-operator > > > Best, > Yang > > Harold.Miao 于2020年9月17日周四 上午10:14写道: > > > hi flink > > > > 请教一下官方在支持k8s operator部署这块有什么计划吗? > > > > 谢谢 > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
关于checkpointFunction接口
大佬好,现在有个疑问,因为要用到自定义的state 保存某个值,这个状态是在keydprocessFunction实现类中用到的,我在sink的时候,实现了CheckpointFunction接口,实现了snapshotState方法,在这个方法中,我只写了一个flush到kudu的方法。这样的话我之前自己定义的state能保存到状态后端吗。 换句话说,就是snapshotState这个方法是在做checkpoint的同时调用了这个方法中的动作,还是说其他的状态不做了,只做我实现的snapshotState这个方法呢?
Re: 请问flink jdbc connector 支持greenplum吗
官方的暂不支持。 需要自己开发JdbcDialect插件。 On Thu, 17 Sep 2020 at 13:59, xuzh wrote: > 请问flink jdbc connector 支持greenplum吗,还是要另外自己写插件
Per-job mode 任务失败 jm没有退出
Flink 1.11.1 CDH 5.15.2 提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive /tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar flink-conf.yaml 重启策策略 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 5 restart-strategy.fixed-delay.delay: 10 s 我在测试失败重启策略,发现任务失败之后会在重试次数之后,Task停止。Web UI 显示在Completed Jobs里面,jm没有挂掉,看yarn上面任务在Runing状态。占用的资源是只有jm的资源了。 1. per-job 任务失败重试次数之后jm不会退出吗,还是我某些参数设置的不对? 是我在flapmap里面手动抛出的异常,报错: 2020-09-17 15:48:47 org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1. ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1. ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler .java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler .handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler .maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler .updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase .updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster .updateTaskExecutionState(JobMaster.java:386) at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor .handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool .java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread .java:107) Caused by: java.lang.Exception: Test failed at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap( TidbBinlogSyncHive.java:231) at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap( TidbBinlogSyncHive.java:178) at org.apache.flink.streaming.api.operators.StreamFlatMap .processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
Re: K8s native 部署失败
是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的 > 2020年9月17日 上午11:10,Yang Wang 写道: > > 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢 > > 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的 > 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常 > > 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题 > > > Best, > Yang > > yanzhibo 于2020年9月16日周三 下午5:25写道: > >> 一个job manager pod 提交job后,申请taskmanager失败 >> >> >> Taskmanager 的异常 >> >> Fatal error occurred in TaskExecutor akka.tcp:// >> flink@179.10.251.70:6122/user/rpc/taskmanager_0. >> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: >> Could not register at the ResourceManager within the specified maximum >> registration duration 30 ms. This indicates a problem with this >> instance. Terminating now. >>at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.actor.Actor.aroundReceive(Actor.scala:517) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.actor.Actor.aroundReceive$(Actor.scala:515) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> [flink-dist_2.12-1.11.1.jar:1.11.1] >> 2020-09-16 09:14:39,077 ERROR >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal >> error occurred while executing the TaskManager. Shutting it down... >> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: >> Could not register at the ResourceManager within the specified maximum >> registration duration 30 ms. This indicates a problem with this >> instance. Terminating now. >>at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) >> ~ >> >> >> Jobmanger 异常 >> >> 0d5f8478a2ab4e17d816810752f669eb) switched from SCHEDULED to FAILED on not >> deployed. >> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> Could not allocate the required slot within slot request timeout. Please >> make sure that the cluster has enough
Re: python udf 提交到本地节点执行报错
感谢搞定了,根据你提供的文档,我把命令改为 flink run -py src/etl/hello_world.py -pyexec /usr/local/opt/python@3.7/bin/python3 指定了 python 执行器就行了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: python udf 提交到本地节点执行报错
Hi, 你可以参考文档[1],里面的api set_python_executable(python_exec)用来设置你的Python环境的,然后你需要确保你这个python环境有安装pyflink。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#python-dependency Best, Xingbo myfjdthink 于2020年9月17日周四 下午3:13写道: > 你好,我的本地集群是单点的,直接使用文档里的 > bin/start-cluster.sh > 命令启动的。 > > 我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: python udf 提交到本地节点执行报错
你好,我的本地集群是单点的,直接使用文档里的 bin/start-cluster.sh 命令启动的。 我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: python udf 提交到本地节点执行报错
Hi, 你可以看到报错信息里面有这么一条: ImportError: No module named pyflink 看起来是你的集群环境使用的python环境没有安装pyflink Best, Xingbo myfjdthink 于2020年9月17日周四 下午2:50写道: > 操作系统 > > Mac OS > > flink --version > > Version: 1.11.1, Commit ID: 7eb514a > > > 代码 > > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes > from pyflink.table.udf import udf > > # 1. create a TableEnvironment > env_settings = > > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = > StreamTableEnvironment.create(environment_settings=env_settings) > > # 2. create source Table > table_env.execute_sql(""" > CREATE TABLE datagen ( > id BIGINT, > data STRING > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'sequence', > 'fields.id.start' = '1', > 'fields.id.end' = '20' > ) > """) > > # 3. create sink Table > table_env.execute_sql(""" > CREATE TABLE print ( > id BIGINT, > data STRING > ) WITH ( > 'connector' = 'print' > ) > """) > > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT(), udf_type="pandas") > def add(i, j): > return i + j > > > > table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > table_env.register_function("add", add) > table_env.execute_sql("""INSERT INTO print > SELECT add(id, 1), data FROM datagen > """).get_job_client().get_job_execution_result().result() > > > 执行执行 py 文件是可以正常运行的 > > 用以下命令提交到 flink 上会报错 > > flink run -py src/etl/hello_world.py > > 报错信息 > > > flink run -py src/etl/hello_world.py > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/nick/flink-1.11.1/lib/flink-dist_2.12-1.11.1.jar) to field > java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > Job has been submitted with JobID f38ed31397d5bd6af813bd3048d49048 > Traceback (most recent call last): > File "src/etl/hello_world.py", line 41, in > """).get_job_client().get_job_execution_result().result() > File > > "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/common/completable_future.py", > line 78, in result > File > > "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > > "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > > "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o71.get. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: f38ed31397d5bd6af813bd3048d49048) > at > > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) > at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:832) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: f38ed31397d5bd6af813bd3048d49048) > at > > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) > at > > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > >
Re: pyflink连接器支持问题
Hi, 现在flink没有提供官方的IBM MQ的connector实现,现在已经支持的connector类型,你可以参考文档[1]。如果你需要支持其他connector,你需要提供自定义的connector的java实现,然后在你的python作业里面通过api或者命令行参数的方式把connector的Jar包添加进去,具体可以参考文档[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files Best, Xingbo whh_960101 于2020年9月17日周四 下午2:46写道: > 您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!
python udf 提交到本地节点执行报错
操作系统 Mac OS flink --version Version: 1.11.1, Commit ID: 7eb514a 代码 from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf # 1. create a TableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) # 2. create source Table table_env.execute_sql(""" CREATE TABLE datagen ( id BIGINT, data STRING ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '20' ) """) # 3. create sink Table table_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING ) WITH ( 'connector' = 'print' ) """) @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas") def add(i, j): return i + j table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') table_env.register_function("add", add) table_env.execute_sql("""INSERT INTO print SELECT add(id, 1), data FROM datagen """).get_job_client().get_job_execution_result().result() 执行执行 py 文件是可以正常运行的 用以下命令提交到 flink 上会报错 flink run -py src/etl/hello_world.py 报错信息 flink run -py src/etl/hello_world.py WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/nick/flink-1.11.1/lib/flink-dist_2.12-1.11.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Job has been submitted with JobID f38ed31397d5bd6af813bd3048d49048 Traceback (most recent call last): File "src/etl/hello_world.py", line 41, in """).get_job_client().get_job_execution_result().result() File "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/common/completable_future.py", line 78, in result File "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/Users/nick/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/Users/nick/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o71.get. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f38ed31397d5bd6af813bd3048d49048) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f38ed31397d5bd6af813bd3048d49048) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at
pyflink连接器支持问题
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!
Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源
你仔细看看这两个数据源是不是有什么不同 只要有一点不同,Blink 就 reuse 不了 On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote: > 场景描述: > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图 > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis > 问题描述: > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源 > > 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源 > - table.optimizer.reuse-source-enabled > - table.optimizer.reuse-sub-plan-enabled > > 请问下,有人碰到类似问题么? > > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Jingsong Lee
Re: Re: Re: Re: Re: StreamingFileWriter 压测性能
是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > 为何要强制滚动文件 因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote: > > > > ok. 就是用hadoop mr writer vs flink 自实现的native > writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer > 改成false是可以满足我们的写hive需求了 > 还有一个问题,之前问过你,你还没回复: > HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? > 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 > sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 > 在 2020-09-17 13:43:04,"Jingsong Li" 写道: > >可以再尝试下最新的1.11.2吗? > > > >https://flink.apache.org/downloads.html > > > >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote: > > > >> 是master分支代码 > >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 > >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 > >> if (userMrWriter) { > >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > assigner, > >> rollingPolicy, outputFileConfig); > >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); > >> } else { > >>Optional> bulkFactory = > >> createBulkWriterFactory(partitionColumns, sd); > >>if (bulkFactory.isPresent()) { > >> builder = StreamingFileSink.forBulkFormat( > >> new org.apache.flink.core.fs.Path(sd.getLocation()), > >> new > >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), > partComputer)) > >> .withBucketAssigner(assigner) > >> .withRollingPolicy(rollingPolicy) > >> .withOutputFileConfig(outputFileConfig); > >> LOG.info("Hive streaming sink: Use native parquet writer."); > >> } else { > >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > >> assigner, rollingPolicy, outputFileConfig); > >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because > >> BulkWriter Factory not available."); > >> } > >> } > >> 在 2020-09-17 13:21:40,"Jingsong Li" 写道: > >> >是最新的代码吗? > >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > >> >它是影响性能的,1.11.2已经投票通过,即将发布 > >> > > >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: > >> > > >> >> @Jingsong Li > >> >> > >> >> public TableSink createTableSink(TableSinkFactory.Context context) { > >> >>CatalogTable table = checkNotNull(context.getTable()); > >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); > >> >> > >> >>boolean isGeneric = > >> >> > >> > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > >> >> > >> >>if (!isGeneric) { > >> >> return new HiveTableSink( > >> >> context.getConfiguration().get( > >> >> > HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > >> >> context.isBounded(), > >> >> new JobConf(hiveConf), > >> >> context.getObjectIdentifier(), > >> >> table); > >> >> } else { > >> >> return TableFactoryUtil.findAndCreateTableSink(context); > >> >> } > >> >> } > >> >> > >> >> > >> > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > >> >> > >> >> If it is false, using flink native writer to write parquet and orc > >> files; > >> >> > >> >> If it is true, using hadoop mapred record writer to write parquet and > >> orc > >> >> files > >> >> > >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W > >> >> > >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush > >> >> 一些相关的参数 ? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-17 11:21:43,"Jingsong Li" 写道: > >> >> >Sink并行度 > >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >> >> > > >> >> >HDFS性能 > >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >> >> > > >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang > wrote: > >> >> > > >> >> >> 场景很简单,就是kafka2hive > >> >> >> --5min入仓Hive > >> >> >> > >> >> >> INSERT INTO hive.temp_.hive_5min > >> >> >> > >> >> >> SELECT > >> >> >> > >> >> >> arg_service, > >> >> >> > >> >> >> time_local > >> >> >> > >> >> >> . > >> >> >> > >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), > >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> >> >> > >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' > >> properties.group.id > >> >> '='kafka_hive_test', > >> >> >> 'scan.startup.mode'='earliest-offset') */; > >> >> >> > >> >> >> > >> >> >> > >> >> >> --kafka source表定义 > >> >> >> > >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> >> >> > >> >> >> arg_service string COMMENT 'arg_service', > >> >> >> > >> >> >> > >> >> >> > >> >> >> )WITH ( > >> >> >> > >> >> >> 'connector' = 'kafka', > >> >> >> > >> >> >> 'topic' = '...', > >> >> >> > >> >> >> 'properties.bootstrap.servers' = '...', > >> >> >> > >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> >> >> > >> >> >> 'scan.startup.mode' = 'group-offsets', > >> >> >> > >> >> >> 'format' = 'json', > >> >> >> > >> >> >> 'json.fail-on-missing-field' = 'false', > >> >> >> >
Re:Re: Re: Re: Re: StreamingFileWriter 压测性能
ok. 就是用hadoop mr writer vs flink 自实现的native writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 改成false是可以满足我们的写hive需求了 还有一个问题,之前问过你,你还没回复: HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 在 2020-09-17 13:43:04,"Jingsong Li" 写道: >可以再尝试下最新的1.11.2吗? > >https://flink.apache.org/downloads.html > >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote: > >> 是master分支代码 >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> if (userMrWriter) { >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, >> rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> } else { >>Optional> bulkFactory = >> createBulkWriterFactory(partitionColumns, sd); >>if (bulkFactory.isPresent()) { >> builder = StreamingFileSink.forBulkFormat( >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> new >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) >> .withBucketAssigner(assigner) >> .withRollingPolicy(rollingPolicy) >> .withOutputFileConfig(outputFileConfig); >> LOG.info("Hive streaming sink: Use native parquet writer."); >> } else { >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> BulkWriter Factory not available."); >> } >> } >> 在 2020-09-17 13:21:40,"Jingsong Li" 写道: >> >是最新的代码吗? >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> > >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: >> > >> >> @Jingsong Li >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >>CatalogTable table = checkNotNull(context.getTable()); >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >>boolean isGeneric = >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >>if (!isGeneric) { >> >> return new HiveTableSink( >> >> context.getConfiguration().get( >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> context.isBounded(), >> >> new JobConf(hiveConf), >> >> context.getObjectIdentifier(), >> >> table); >> >> } else { >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> } >> >> } >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> If it is false, using flink native writer to write parquet and orc >> files; >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> orc >> >> files >> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> >> 一些相关的参数 ? >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" 写道: >> >> >Sink并行度 >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> >> > >> >> >HDFS性能 >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> >> > >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang wrote: >> >> > >> >> >> 场景很简单,就是kafka2hive >> >> >> --5min入仓Hive >> >> >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> >> >> SELECT >> >> >> >> >> >> arg_service, >> >> >> >> >> >> time_local >> >> >> >> >> >> . >> >> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' >> properties.group.id >> >> '='kafka_hive_test', >> >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> >> >> >> >> >> >> >> >> )WITH ( >> >> >> >> >> >> 'connector' = 'kafka', >> >> >> >> >> >> 'topic' = '...', >> >> >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> >> >> 'format' = 'json', >> >> >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> >> >> ); >> >> >> --sink hive表定义 >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> >> >> >> >> ) >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES( >> >> >> 'sink.partition-commit.trigger'='process-time', >> >> >> 'sink.partition-commit.delay'='0 min', >> >> >>