flink hive维表关联报错snappy压缩问题
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1886) at org.apache.hadoop.mapred.SequenceFileRecordReader.(SequenceFileRecordReader.java:49) at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64) at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:113) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:162) at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:128) at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:103) at LookupFunction$74.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$71.processElement(Unknown Source) 请问snappy的问题怎么解决?
flink 小文件合并及分区没数据无法提交问题
1.flink 小文件合并 测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成 2. 某些分区没数据时无法触发分区提交问题 我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决
flink 小文件合并及分区没数据无法提交问题
1.flink 小文件合并 测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成 2. 某些分区没数据时无法触发分区提交问题 我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决
flink 1.12分支写入hive decimal类型jar冲突
flink版本:1.12 hive版本:2.3.4 flink 1.12分支写入hive decimal类型报错: java.lang.NoSuchMethodError: org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J at org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159) at org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56) at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557) at org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58) at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589) at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$154.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) 这个方法是在hive-exec版本3.1.2版本才有 请问:hive版本 orc版本之间有什么兼容性问题?如何解
flink sql 写入clickhouse性能优化
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?
flink升级hadoop3
flink 如何升级hadoop3 ?
flink升级hadoop3
flink 如何升级hadoop3 ?
flink 写hive decimal类型报错
java.lang.NoSuchMethodError: org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J at org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159) at org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56) at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557) at org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58) at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589) at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$154.processElement(Unknown Source) 用的是flink-sql-connector-hive-2.3.6_2.11-1.12-SNAPSHOT.jar,公司的Hive也是这个版本,可能是什么原因导致?
window agg early-fire 不生效
1min的滚动窗口: table.exec.emit.early-fire.enabled=true; table.exec.emit.early-fire.delay=10 s; 设置窗口定期trigger之后,参数不生效 查看执行计划: { "id": 6, "type": "GroupWindowAggregate(groupBy=[mid, code, floor_id], window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], emit=[early delay 1 millisecond])", "pact": "Operator", "contents": "GroupWindowAggregate(groupBy=[mid, code, floor_id], window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], emit=[early delay 1 millisecond])", "parallelism": 72, "chaining_strategy": "ALWAYS", "uid": "9tx/TSKD9GBbEnuTZOIRSA==", "predecessors": [ { "id": 4, "ship_strategy": "HASH", "side": "second" } ] }, 可以确认已经设置成功,就不知道为啥没有定期trigger。 现在看起来像是watermark允许数据延迟设置的5min + 窗口 1min =6min之后才能看到结果
Re:回复: Re: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
@ guoliubin85 感谢回复,和你说的差不多,问题已经搞定。 在 2020-12-14 13:02:54,"guoliubi...@foxmail.com" 写道: >不好意思我没说清楚。 >我这边用的是这样的SQL可以运作,你可以参考下。 >CREATE TABLE `someTable` ( > eventTime TIMESTAMP(3), > WATERMARK FOR eventTime AS eventTime >) >eventTime是java的Long类型,包含毫秒,SQL里可以直接转成TIMESTAMP >select someFunc(field) >from `someTable` >group by TUMBLE(eventTime, INTERVAL '1' SECOND) > > > >guoliubi...@foxmail.com > >发件人: kandy.wang >发送时间: 2020-12-14 11:23 >收件人: user-zh >主题: Re:回复: Window aggregate can only be defined over a time attribute column, >but TIMESTAMP(3) encountered. >hi guoliubin85: >一样的报错: > >Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' >MINUTE) as log_minute,count(1) pv > >> from lightart_expose > >> where code is not null and floor_id is not null > >> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' >> MINUTE);[ERROR] Could not execute SQL statement. Reason: > >org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' >to arguments of type '$TUMBLE(, )'. Supported >form(s): '$TUMBLE(, )' > >'$TUMBLE(, , )' > > > >> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' >> MINUTE);[ERROR] Could not execute SQL statement. Reason: >org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' >to arguments of type '$TUMBLE(, )'. Supported >form(s): '$TUMBLE(, )' >'$TUMBLE(, , )' > >在 2020-12-14 10:41:12,"guoliubi...@foxmail.com" 写道: >>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP >> >> >> >>guoliubi...@foxmail.com >> >>发件人: kandy.wang >>发送时间: 2020-12-14 10:28 >>收件人: user-zh >>主题: Window aggregate can only be defined over a time attribute column, but >>TIMESTAMP(3) encountered. >>[ERROR] Could not execute SQL statement. >>Reason:org.apache.flink.table.api.TableException: Window aggregate can only >>be defined over a time attribute column, but TIMESTAMP(3) encountered. >> >> >>SQL 如下: >>create temporary view expose as >> >>select >> >>mid >> >>,time_local >> >>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as >>log_ts >> >>,proctime >> >>from hive.temp.kafka_table >> >>; >>time_local 是bigint >> >> >> >>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv >> >>from expose >> >>group by TUMBLE(log_ts, INTERVAL '1' MINUTE); >> >> >>window agg的字段报错,如何解决。
Re:回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
hi guoliubin85: 一样的报错: Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' MINUTE) as log_minute,count(1) pv > from lightart_expose > where code is not null and floor_id is not null > group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' > MINUTE);[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s): '$TUMBLE(, )' '$TUMBLE(, , )' > group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' > MINUTE);[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s): '$TUMBLE(, )' '$TUMBLE(, , )' 在 2020-12-14 10:41:12,"guoliubi...@foxmail.com" 写道: >TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP > > > >guoliubi...@foxmail.com > >发件人: kandy.wang >发送时间: 2020-12-14 10:28 >收件人: user-zh >主题: Window aggregate can only be defined over a time attribute column, but >TIMESTAMP(3) encountered. >[ERROR] Could not execute SQL statement. >Reason:org.apache.flink.table.api.TableException: Window aggregate can only be >defined over a time attribute column, but TIMESTAMP(3) encountered. > > >SQL 如下: >create temporary view expose as > >select > >mid > >,time_local > >,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as >log_ts > >,proctime > >from hive.temp.kafka_table > >; >time_local 是bigint > > > >select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv > >from expose > >group by TUMBLE(log_ts, INTERVAL '1' MINUTE); > > >window agg的字段报错,如何解决。
Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. SQL 如下: create temporary view expose as select mid ,time_local ,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as log_ts ,proctime from hive.temp.kafka_table ; time_local 是bigint select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv from expose group by TUMBLE(log_ts, INTERVAL '1' MINUTE); window agg的字段报错,如何解决。
Re:flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.
JdbcBatchingOutputFormat: for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } try { Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); } } } 嗯,看起来是这样的。 if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); }这个判断重试的代码应该放在sleep 后面。不然,Caused by: java.io.IOException: java.sql.SQLException: No operations allowed after statement closed. 就没机会重建连接了。 在 2020-12-03 10:36:28,"yanzi" 写道: >使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。 >针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: flink sql cdc sum 结果出现NULL
@Jianzhi Zhang 嗯,是这个原因,感谢 回复。 就是decimal的精度问题 在 2020-12-01 13:24:23,"Jianzhi Zhang" 写道: >是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > >> 2020年11月19日 下午10:41,kandy.wang 写道: >> >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >>PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >>
flink cdc 如何保证group agg结果正确性
insert into kudu.default_database.index_agg SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss') FROM XX.XX.XX group by v_spu_id; XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。
flink cdc 如何保证group agg结果正确性
insert into kudu.default_database.index_agg SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss') FROM XX.XX.XX group by v_spu_id; XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。
flink 自定义AggregateFunction 如何识别HyperLogLog对象?
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ? 就不知道这个TypeInformation该如何写。 代码如下: import io.airlift.slice.Slices; import io.airlift.stats.cardinality.HyperLogLog; import org.apache.flink.table.functions.AggregateFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; public class FlinkUDAFCardinalityEstimationFunction extends AggregateFunction { private static final Logger LOG = LoggerFactory.getLogger(JsonArrayParseUDTF.class); private static final int NUMBER_OF_BUCKETS = 4096; @Override public HyperLogLog createAccumulator() { return HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } @Override public Long getValue(HyperLogLog acc) { if(acc == null){ return 0L; } return acc.cardinality(); } public void accumulate(HyperLogLog acc, String element) { if(element == null){ return; } acc.add(Slices.utf8Slice(element)); } public void retract(HyperLogLog acc, byte[] element) { // do nothing LOG.info("-- retract:" + new String(element)); } public void merge(HyperLogLog acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { HyperLogLog a = iter.next(); if(a != null) { acc.mergeWith(a); } } } public void resetAccumulator(HyperLogLog acc) { acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } }
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
flink sql cdc sum 结果出现NULL
--mysql表 CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( `spu_id` BIGINT , `leaving_price` DECIMAL(10, 5), PRIMARY KEY ( `spu_id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://...', 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', 'username' = '...', 'password' = '..' ); --binlog 2mysql insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price FROM hive.database.table group by v_spu_id; hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 有什么好的排查思路么?
Re:关于filesystem connector的一点疑问
hi: 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。 https://cloud.tencent.com/developer/article/1707182 这个连接可以看一下 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道: >Hi,all >Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >现在有这样的场景: >消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >有大佬知道吗,有实际验证过吗 >感谢 > >附上简单sql: >CREATE TABLE kafka ( >a STRING, >b STRING, >c BIGINT, >process_time BIGINT, >e STRING, >f STRING, >g STRING, >h INT, >i STRING >) WITH ( >'connector' = 'kafka', >'topic' = 'topic', >'properties.bootstrap.servers' = 'x', >'properties.group.id' = 'test-1', >'scan.startup.mode' = 'latest-offset', >'format' = 'json', >'properties.flink.partition-discovery.interval-millis' = '30' >); > >CREATE TABLE filesystem ( >`day` STRING, >`hour` STRING, >a STRING, >b STRING, >c BIGINT, >d BIGINT, >e STRING, >f STRING, >g STRING, >h INT, >i STRING >) PARTITIONED BY (`day`, `hour`) WITH ( >'connector' = 'filesystem', >'format' = 'parquet', >'path' = 'hdfs://xx', >'parquet.compression'='SNAPPY', >'sink.partition-commit.policy.kind' = 'success-file' >); > >insert into filesystem >select >from_unixtime(process_time,'-MM-dd') as `day`, >from_unixtime(process_time,'HH') as `hour`, >a, >b, >c, >d, >e, >f, >g, >h, >i >from kafka; > > > >[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
flink sql hbase维表关联性能上不去
看了一下hbase的维表关联主要是通过org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction 实现的,测试了一下性能tps只有大概3-4w, 经加本地cache之后性能仍然没有提升。 分析了一下flink ui LookupJoin 是与kafka source的算子 chain 在一起了,这样整个算子的并行度就受限于kafka分区的并行度。 1.想问一下这块的 hbase connector开发,是否有做过connector的性能测试。 2.想问一下,hbase维表关联还有没有性能提升手段。感觉这样的性能都达不到上生产应用的要求。
flink sql state queryable ?
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 才可以查询么。 诉求就是想知道state里到底存的啥
group agg 开启了mini batch之后,state ttl不生效的问题
group agg 开启了mini batch之后,state ttl不生效的问题: 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。 sql-client-defaults.yaml对应的参数应该是这2个吧: # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 这个现在进展如何了,这个社区打算什么时候支持
Re:Re: Flink SQL撤回流问题
hi 你建mysql要指定主键,另外创建flink表时也要指定一下主键 PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 在 2020-09-27 13:36:25,"xiao cai" 写道: >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 >这是我很困惑的地方。 > > > 原始邮件 >发件人: lec ssmi >收件人: flink-user-cn >发送时间: 2020年9月27日(周日) 13:06 >主题: Re: Flink SQL撤回流问题 > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai > 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > >sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > >select dt,count(distinct id) from source group by dt; > > > >这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show >create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > >发件人: Michael Ran > 收件人: user-zh >> 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 >2020-09-27 11:48:36,"xiao cai" 写道: >Hi: > >使用Flink >SQL撤回流写入MySQL,表的auto_increment > >越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Re:Re: 查询hbase sink结果表,有时查到数据,有时查不到
hi Leonard: 实际在HBaseSinkFunction中打log测试下来发现,都是UPDATE_AFTER类型的RowData数据写Hbase,没有你说的那种retract消息呢。如果是retract 应该是 会先发一条UPDATE_BEFORE 消息,再发一条UPDATE_AFTER消息吧。实际测下来 都是UPDATE_AFTER,转成了hbase的Put操作,就好比每次都是upsert一样。 在 2020-09-25 10:03:34,"Leonard Xu" 写道: >Hi >> 通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete >> hbase的某条rowkey数据,导致客户端查不到数据? >> 我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ? > >是的,group by 算子会像下游 hbase sink发retract消息,hbase >sink处理retract消息的实现就是先delete再insert,所以去查hbase的时候就会碰到你说的有时查不到的情况。 > >祝好 >Leonard
查询hbase sink结果表,有时查到数据,有时查不到
insert into hive.temp_dw.day_order_index select rowkey, ROW(orderN,) from ( select order_date as rowkey, count(distinct parent_sn) as orderN, group by order_date ) 通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete hbase的某条rowkey数据,导致客户端查不到数据? 我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?
flink sql grouping sets语义中NOT NULL不生效
sql如下: select (case when act_name is not null then act_name else 'default_value' end) as act_name, (case when fst_plat is not null then fst_plat else 'default_value' end) as fst_plat, sum(amount) as saleN from hive.temp_dw.view_trad_order_goods_source_act_last_value group by grouping sets((act_name),(act_name,fst_plat) 而hive.temp_dw.view_trad_order_goods_source_act_last_value 是一个 view,view的结构如下: Flink SQL> desc hive.temp_dw.view_trad_order_goods_source_act_last_value |-- act_name: STRING |-- fst_plat: STRING NOT NULL .. 其中fst_plat 数据类型为:STRING NOT NULL ,在grouping sets的(act_name)条件分支时,fst_plat不参与group, 实际测下来发现 (case when fst_plat is not null then fst_plat else 'default_value' end) as fst_plat 输出的结果都没走到 else的default_value 感觉 fst_plat is not null 不生效。 感觉是和fst_plat: STRING NOT NULL 这个条件有关系。有这个条件限制时,就破坏了NOT NULL的语义。
flink pb转json性能问题
因flink目前不支持pb format,调用了,protobuf-java-util com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message) 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json, 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps. 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2 、社区对pb format 会支持么? 3、pb转json 有什么性能比较好的工具包
Re:Re: Re: Re: StreamingFileWriter 压测性能
hi wangenbao : 我这边还没出现过OOM的情况,我理解调大TM 的资源内存 CPU这些参数应当是可以的。 我这边遇到的问题是性能上不去。不过table.exec.hive.fallback-mapred-writer=false 确实有较大改观。 在 2020-09-18 16:45:29,"wangenbao" <156827...@qq.com> 写道: >我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr >write; >当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM; >相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题; >Jingsong Li wrote >> 是最新的代码吗? >> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> 它是影响性能的,1.11.2已经投票通过,即将发布 >> >> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang > >> kandy1203@ > >> wrote: >> >>> @Jingsong Li >>> >>> public TableSink createTableSink(TableSinkFactory.Context context) { >>>CatalogTable table = checkNotNull(context.getTable()); >>> Preconditions.checkArgument(table instanceof CatalogTableImpl); >>> >>>boolean isGeneric = >>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >>> >>>if (!isGeneric) { >>> return new HiveTableSink( >>> context.getConfiguration().get( >>> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >>> context.isBounded(), >>> new JobConf(hiveConf), >>> context.getObjectIdentifier(), >>> table); >>> } else { >>> return TableFactoryUtil.findAndCreateTableSink(context); >>> } >>> } >>> >>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >>> >>> If it is false, using flink native writer to write parquet and orc files; >>> >>> If it is true, using hadoop mapred record writer to write parquet and orc >>> files >>> >>> 将此参数调整成false后,同样的资源配置下,tps达到30W >>> >>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >>> 一些相关的参数 ? >>> >>> >>> >>> >>> >>> 在 2020-09-17 11:21:43,"Jingsong Li" > >> jingsonglee0@ > >> 写道: >>> >Sink并行度 >>> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >>> > >>> >HDFS性能 >>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >>> > >>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang > >> kandy1203@ > >> wrote: >>> > >>> >> 场景很简单,就是kafka2hive >>> >> --5min入仓Hive >>> >> >>> >> INSERT INTO hive.temp_.hive_5min >>> >> >>> >> SELECT >>> >> >>> >> arg_service, >>> >> >>> >> time_local >>> >> >>> >> . >>> >> >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >>> >> >>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >>> '='kafka_hive_test', >>> >> 'scan.startup.mode'='earliest-offset') */; >>> >> >>> >> >>> >> >>> >> --kafka source表定义 >>> >> >>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >>> >> >>> >> arg_service string COMMENT 'arg_service', >>> >> >>> >> >>> >> >>> >> )WITH ( >>> >> >>> >> 'connector' = 'kafka', >>> >> >>> >> 'topic' = '...', >>> >> >>> >> 'properties.bootstrap.servers' = '...', >>> >> >>> >> 'properties.group.id' = 'flink_etl_kafka_hive', >>> >> >>> >> 'scan.startup.mode' = 'group-offsets', >>> >> >>> >> 'format' = 'json', >>> >> >>> >> 'json.fail-on-missing-field' = 'false', >>> >> >>> >> 'json.ignore-parse-errors' = 'true' >>> >> >>> >> ); >>> >> --sink hive表定义 >>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >>> >> >>> >> ) >>> >> PARTITIONED BY (dt string , hm string) stored as orc location >>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES( >>> >> 'sink.partition-commit.trigger'='process-time', >>> >> 'sink.partition-commit.delay'='0 min', >>> >> '
Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能
@Jingsong Li 测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 修改应该都提交到flink 1.11分支了吧。 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? 在 2020-09-17 14:19:42,"Jingsong Li" 写道: >是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > >> 为何要强制滚动文件 > >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 > >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote: > >> >> >> >> ok. 就是用hadoop mr writer vs flink 自实现的native >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer >> 改成false是可以满足我们的写hive需求了 >> 还有一个问题,之前问过你,你还没回复: >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 >> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 >> 在 2020-09-17 13:43:04,"Jingsong Li" 写道: >> >可以再尝试下最新的1.11.2吗? >> > >> >https://flink.apache.org/downloads.html >> > >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote: >> > >> >> 是master分支代码 >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> >> if (userMrWriter) { >> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, >> >> rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> >> } else { >> >>Optional> bulkFactory = >> >> createBulkWriterFactory(partitionColumns, sd); >> >>if (bulkFactory.isPresent()) { >> >> builder = StreamingFileSink.forBulkFormat( >> >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> >> new >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), >> partComputer)) >> >> .withBucketAssigner(assigner) >> >> .withRollingPolicy(rollingPolicy) >> >> .withOutputFileConfig(outputFileConfig); >> >> LOG.info("Hive streaming sink: Use native parquet writer."); >> >> } else { >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> >> assigner, rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> >> BulkWriter Factory not available."); >> >> } >> >> } >> >> 在 2020-09-17 13:21:40,"Jingsong Li" 写道: >> >> >是最新的代码吗? >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> >> > >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: >> >> > >> >> >> @Jingsong Li >> >> >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >> >>CatalogTable table = checkNotNull(context.getTable()); >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >> >> >>boolean isGeneric = >> >> >> >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >> >> >>if (!isGeneric) { >> >> >> return new HiveTableSink( >> >> >> context.getConfiguration().get( >> >> >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> >> context.isBounded(), >> >> >> new JobConf(hiveConf), >> >> >> context.getObjectIdentifier(), >> >> >> table); >> >> >> } else { >> >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> >> } >> >> >> } >> >> >> >> >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> >> >> If it is false, using flink native writer to write parquet and orc >> >> files; >> >> >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> >> orc >> >> >> files >> >> >> >> >&
Re:Re: Re: Re: Re: StreamingFileWriter 压测性能
ok. 就是用hadoop mr writer vs flink 自实现的native writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 改成false是可以满足我们的写hive需求了 还有一个问题,之前问过你,你还没回复: HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 在 2020-09-17 13:43:04,"Jingsong Li" 写道: >可以再尝试下最新的1.11.2吗? > >https://flink.apache.org/downloads.html > >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote: > >> 是master分支代码 >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> if (userMrWriter) { >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, >> rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> } else { >>Optional> bulkFactory = >> createBulkWriterFactory(partitionColumns, sd); >>if (bulkFactory.isPresent()) { >> builder = StreamingFileSink.forBulkFormat( >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> new >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) >> .withBucketAssigner(assigner) >> .withRollingPolicy(rollingPolicy) >> .withOutputFileConfig(outputFileConfig); >> LOG.info("Hive streaming sink: Use native parquet writer."); >> } else { >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> BulkWriter Factory not available."); >> } >> } >> 在 2020-09-17 13:21:40,"Jingsong Li" 写道: >> >是最新的代码吗? >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> > >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: >> > >> >> @Jingsong Li >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >>CatalogTable table = checkNotNull(context.getTable()); >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >>boolean isGeneric = >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >>if (!isGeneric) { >> >> return new HiveTableSink( >> >> context.getConfiguration().get( >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> context.isBounded(), >> >> new JobConf(hiveConf), >> >> context.getObjectIdentifier(), >> >> table); >> >> } else { >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> } >> >> } >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> If it is false, using flink native writer to write parquet and orc >> files; >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> orc >> >> files >> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> >> 一些相关的参数 ? >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" 写道: >> >> >Sink并行度 >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> >> > >> >> >HDFS性能 >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> >> > >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang wrote: >> >> > >> >> >> 场景很简单,就是kafka2hive >> >> >> --5min入仓Hive >> >> >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> >> >> SELECT >> >> >> >> >> >> arg_service, >> >> >> >> >> >> time_local >> >> >> >> >> >> . >> >> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >
Re:Re: Re: Re: StreamingFileWriter 压测性能
是master分支代码 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 if (userMrWriter) { builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); } else { Optional> bulkFactory = createBulkWriterFactory(partitionColumns, sd); if (bulkFactory.isPresent()) { builder = StreamingFileSink.forBulkFormat( new org.apache.flink.core.fs.Path(sd.getLocation()), new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) .withBucketAssigner(assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use native parquet writer."); } else { builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available."); } } 在 2020-09-17 13:21:40,"Jingsong Li" 写道: >是最新的代码吗? >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >它是影响性能的,1.11.2已经投票通过,即将发布 > >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote: > >> @Jingsong Li >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >>CatalogTable table = checkNotNull(context.getTable()); >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >>boolean isGeneric = >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >>if (!isGeneric) { >> return new HiveTableSink( >> context.getConfiguration().get( >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> context.isBounded(), >> new JobConf(hiveConf), >> context.getObjectIdentifier(), >> table); >> } else { >> return TableFactoryUtil.findAndCreateTableSink(context); >> } >> } >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> If it is false, using flink native writer to write parquet and orc files; >> >> If it is true, using hadoop mapred record writer to write parquet and orc >> files >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> 一些相关的参数 ? >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" 写道: >> >Sink并行度 >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> > >> >HDFS性能 >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang wrote: >> > >> >> 场景很简单,就是kafka2hive >> >> --5min入仓Hive >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> SELECT >> >> >> >> arg_service, >> >> >> >> time_local >> >> >> >> . >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >> '='kafka_hive_test', >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> >> >> >> >> )WITH ( >> >> >> >> 'connector' = 'kafka', >> >> >> >> 'topic' = '...', >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> 'format' = 'json', >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> ); >> >> --sink hive表定义 >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> >> >> ) >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES( >> >> 'sink.partition-commit.trigger'='proces
Re:Re: Re: StreamingFileWriter 压测性能
@Jingsong Li public TableSink createTableSink(TableSinkFactory.Context context) { CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); if (!isGeneric) { return new HiveTableSink( context.getConfiguration().get( HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), context.isBounded(), new JobConf(hiveConf), context.getObjectIdentifier(), table); } else { return TableFactoryUtil.findAndCreateTableSink(context); } } HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 If it is false, using flink native writer to write parquet and orc files; If it is true, using hadoop mapred record writer to write parquet and orc files 将此参数调整成false后,同样的资源配置下,tps达到30W 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush 一些相关的参数 ? 在 2020-09-17 11:21:43,"Jingsong Li" 写道: >Sink并行度 >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >HDFS性能 >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang wrote: > >> 场景很简单,就是kafka2hive >> --5min入仓Hive >> >> INSERT INTO hive.temp_.hive_5min >> >> SELECT >> >> arg_service, >> >> time_local >> >> . >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> FROM hive.temp_.kafka_source_pageview/*+ >> OPTIONS('properties.group.id'='kafka_hive_test', >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> --kafka source表定义 >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> arg_service string COMMENT 'arg_service', >> >> >> >> )WITH ( >> >> 'connector' = 'kafka', >> >> 'topic' = '...', >> >> 'properties.bootstrap.servers' = '...', >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> 'scan.startup.mode' = 'group-offsets', >> >> 'format' = 'json', >> >> 'json.fail-on-missing-field' = 'false', >> >> 'json.ignore-parse-errors' = 'true' >> >> ); >> --sink hive表定义 >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> ) >> PARTITIONED BY (dt string , hm string) stored as orc location >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES( >> 'sink.partition-commit.trigger'='process-time', >> 'sink.partition-commit.delay'='0 min', >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> 'sink.rolling-policy.check-interval' ='30s', >> 'sink.rolling-policy.rollover-interval'='10min', >> 'sink.rolling-policy.file-size'='128MB' >> ); >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> 就是flink sql可以 >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" 写道: >> >Hi, >> > >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> > >> >另外,压测时是否可以看下jstack? >> > >> >Best, >> >Jingsong >> > >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang wrote: >> > >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> ,source >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> > >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee
Re:Re: StreamingFileWriter 压测性能
场景很简单,就是kafka2hive --5min入仓Hive INSERT INTO hive.temp_.hive_5min SELECT arg_service, time_local . FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test', 'scan.startup.mode'='earliest-offset') */; --kafka source表定义 CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( arg_service string COMMENT 'arg_service', )WITH ( 'connector' = 'kafka', 'topic' = '...', 'properties.bootstrap.servers' = '...', 'properties.group.id' = 'flink_etl_kafka_hive', 'scan.startup.mode' = 'group-offsets', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); --sink hive表定义 CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( ) PARTITIONED BY (dt string , hm string) stored as orc location 'hdfs://ssdcluster/._5min' TBLPROPERTIES( 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0 min', 'sink.partition-commit.policy.class'='...CustomCommitPolicy', 'sink.partition-commit.policy.kind'='metastore,success-file,custom', 'sink.rolling-policy.check-interval' ='30s', 'sink.rolling-policy.rollover-interval'='10min', 'sink.rolling-policy.file-size'='128MB' ); 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 就是flink sql可以 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter 这块,有没有什么可以提升性能相关的优化参数? 在 2020-09-16 19:29:50,"Jingsong Li" 写道: >Hi, > >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >另外,压测时是否可以看下jstack? > >Best, >Jingsong > >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang wrote: > >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > > > >-- >Best, Jingsong Lee
Re:Streaming File Sink 不能生成 _SUCCESS 标记文件
加上这个参数'sink.partition-commit.policy.kind'='metastore,success-file' 这个应该是可以work的 在 2020-09-16 15:01:35,"highfei2011" 写道: >Hi,各位好! > 目前遇到一个问题,在使用 FLink -1.11.0 消费 Kafka 数据后,使用 Streaming File Sink 的 > BucketAssigner 的分桶策略 sink 到 hdfs ,默认没有生成 _SUCCESS 标记文件。 > 我在配置中新增了 >val hadoopConf = new Configuration() >hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true”) > > > >但是输出目录里,还是没有 _SUCCESS 文件,麻烦帮出出主意呢,再次谢谢各位! > > >Best, >Yang
StreamingFileWriter 压测性能
压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
HiveTableSink中关于streaming方式文件回滚策略HiveRollingPolicy疑问
private static class HiveRollingPolicy extends CheckpointRollingPolicy { private final long rollingFileSize; private final long rollingTimeInterval; private HiveRollingPolicy( long rollingFileSize, long rollingTimeInterval) { Preconditions.checkArgument(rollingFileSize > 0L); Preconditions.checkArgument(rollingTimeInterval > 0L); this.rollingFileSize = rollingFileSize; this.rollingTimeInterval = rollingTimeInterval; } @Override public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) { return true; } @Override public boolean shouldRollOnEvent(PartFileInfo partFileState, RowData element) { return false; } @Override public boolean shouldRollOnProcessingTime( PartFileInfo partFileState, long currentTime) { try { return currentTime - partFileState.getCreationTime() >= rollingTimeInterval || partFileState.getSize() > rollingFileSize; } catch (IOException e) { throw new UncheckedIOException(e); } } } 没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了: 'sink.rolling-policy.check-interval' ='30s', 'sink.rolling-policy.rollover-interval'='10min', 'sink.rolling-policy.file-size'='128MB' 目的是想让文件按照 固定的size 或记录数 或时间 滚动,这样该如何做?
sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated
自实现了kudu connector报错: 2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327) ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
Re:Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题
@Jingsong orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景 在 2020-08-12 16:04:13,"Jingsong Li" 写道: >另外问一下,是什么格式?csv还是parquet。 >有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗? > >On Wed, Aug 12, 2020 at 2:45 PM kandy.wang wrote: > >> >> >> >> >> >> >> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, >> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。 >> 就是感觉停止之前正在写的那个分区,没有触发commit >> >> >> >> >> 在 2020-08-12 14:26:53,"Jingsong Li" 写道: >> >那你之前的分区除了in-progress文件,有已完成的文件吗? >> > >> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang wrote: >> > >> >> >> >> >> >> >> >> source就是kafka >> >> >> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> >> >> >> >> >> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> >> >> >> 在 2020-08-12 13:28:01,"Jingsong Li" 写道: >> >> >你的source是exactly-once的source吗? >> >> > >> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> >> > >> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang wrote: >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >@ Jingsong >> >> >> >> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 >> >> >> 用presto查询查不了 >> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据, >> >> >> 'sink.partition-commit.trigger'='process-time', >> >> >> 'sink.partition-commit.delay'='0 min', >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> >> 'sink.rolling-policy.check-interval'='30s', >> >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> >> 'sink.rolling-policy.file-size'='128MB' >> >> >>如果是12:39分 05秒左右做一次savepoint,然后 >> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add >> >> >> partition,就导致有数据,但是确查不 了。 >> >> >> >> >> >> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add >> >> >> partition 也能查了。 >> >> >> > >> >> >> > >> >> >> > >> >> >> >在 2020-08-12 12:11:53,"Jingsong Li" 写道: >> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响 >> >> >> >> >> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu wrote: >> >> >> >> >> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。 >> >> >> >>> >> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang >> wrote: >> >> >> >>> >> >> >> >>> > 1.StreamingFileWriter >> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 >> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 >> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm >> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。 >> >> >> >>> > >> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持 >> >> >> >>> >> >> >> >> >> >> >> >> >> >> >> >>-- >> >> >> >>Best, Jingsong Lee >> >> >> >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee
Re:Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题
有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。 就是感觉停止之前正在写的那个分区,没有触发commit 在 2020-08-12 14:26:53,"Jingsong Li" 写道: >那你之前的分区除了in-progress文件,有已完成的文件吗? > >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang wrote: > >> >> >> >> source就是kafka >> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。 >> >> >> >> >> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> >> >> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> >> 在 2020-08-12 13:28:01,"Jingsong Li" 写道: >> >你的source是exactly-once的source吗? >> > >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? >> > >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang wrote: >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >@ Jingsong >> >> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 >> >> 用presto查询查不了 >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据, >> >> 'sink.partition-commit.trigger'='process-time', >> >> 'sink.partition-commit.delay'='0 min', >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> 'sink.rolling-policy.check-interval'='30s', >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> 'sink.rolling-policy.file-size'='128MB' >> >>如果是12:39分 05秒左右做一次savepoint,然后 >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add >> >> partition,就导致有数据,但是确查不 了。 >> >> >> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add >> >> partition 也能查了。 >> >> > >> >> > >> >> > >> >> >在 2020-08-12 12:11:53,"Jingsong Li" 写道: >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响 >> >> >> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu wrote: >> >> >> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。 >> >> >>> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote: >> >> >>> >> >> >>> > 1.StreamingFileWriter >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm >> >> >>> > =2100分区的数据还存在很多的in-progress文件。 >> >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 >> >> >>> > >> >> >>> > >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 >> >> >>> > >> >> >>> > >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持 >> >> >>> >> >> >> >> >> >> >> >> >>-- >> >> >>Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee
Re:Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题
source就是kafka json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。 in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? 在 2020-08-12 13:28:01,"Jingsong Li" 写道: >你的source是exactly-once的source吗? > >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? > >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang wrote: > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >@ Jingsong >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 >> 用presto查询查不了 >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据, >> 'sink.partition-commit.trigger'='process-time', >> 'sink.partition-commit.delay'='0 min', >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> 'sink.rolling-policy.check-interval'='30s', >> 'sink.rolling-policy.rollover-interval'='10min', >> 'sink.rolling-policy.file-size'='128MB' >>如果是12:39分 05秒左右做一次savepoint,然后 >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add >> partition,就导致有数据,但是确查不 了。 >> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add >> partition 也能查了。 >> > >> > >> > >> >在 2020-08-12 12:11:53,"Jingsong Li" 写道: >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响 >> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu wrote: >> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。 >> >>> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote: >> >>> >> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm >> >>> > =2100分区的数据还存在很多的in-progress文件。 >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 >> >>> > >> >>> > >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 >> >>> > >> >>> > >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持 >> >>> >> >> >> >> >> >>-- >> >>Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee
Re:Re:Re: flink 1.11 StreamingFileWriter及sql-client问题
>@ Jingsong >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据, 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0 min', 'sink.partition-commit.policy.kind'='metastore,success-file,custom', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='10min', 'sink.rolling-policy.file-size'='128MB' 如果是12:39分 05秒左右做一次savepoint,然后 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add partition,就导致有数据,但是确查不 了。 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add partition 也能查了。 > > > >在 2020-08-12 12:11:53,"Jingsong Li" 写道: >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响 >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu wrote: >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。 >>> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote: >>> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm >>> > =2100分区的数据还存在很多的in-progress文件。 >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 >>> > >>> > >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 >>> > >>> > >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持 >>> >> >> >>-- >>Best, Jingsong Lee
Re:Re: flink 1.11 StreamingFileWriter及sql-client问题
@ Jingsong 导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了 在 2020-08-12 12:11:53,"Jingsong Li" 写道: >in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响 > >On Wed, Aug 12, 2020 at 11:05 AM Jark Wu wrote: > >> 与我所知,(2) & (3) 有希望能在 1.12 中支持。 >> >> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote: >> >> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 >> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 >> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm >> > =2100分区的数据还存在很多的in-progress文件。 >> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 >> > >> > >> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 >> > >> > >> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持 >> > > >-- >Best, Jingsong Lee
flink 1.11 StreamingFileWriter及sql-client问题
1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。 举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm =2100分区的数据还存在很多的in-progress文件。 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
flink sql csv格式字段分隔符问题
设置 'csv.field-delimiter'='\t' ,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option 'csv.field-delimiter' must be a string with single character, but was: \t 请问,该怎么搞?
Re:Re: Re: FLINK SQL view的数据复用问题
@ godfrey 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。 在 2020-08-04 19:36:56,"godfrey he" 写道: >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用 > >kandy.wang 于2020年8月4日周二 下午6:21写道: > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> @ godfrey >> thanks。刚试了一下,source -> Deduplicate -> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 >> >> >> 在 2020-08-04 17:26:02,"godfrey he" 写道: >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。 >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 >> > >> >kandy.wang 于2020年8月4日周二 下午5:20写道: >> > >> >> FLINK SQL view相关问题: >> >> create view order_source >> >> >> >> as >> >> >> >> select order_id, order_goods_id, user_id,... >> >> >> >> from ( >> >> >> >> .. proctime,row_number() over(partition by order_id, >> >> order_goods_id order by proctime desc) as rownum >> >> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS(' >> properties.group.id'='flink_etl_kafka_hbase', >> >> 'scan.startup.mode'='latest-offset') */ >> >> >> >> ) where rownum = 1 and price > 0; >> >> >> >> >> >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN >> as >> >> BIGINT),) >> >> >> >> from >> >> >> >> ( >> >> >> >> select order_date as rowkey, >> >> >> >> sum(amount) as saleN, >> >> >> >> from order_source >> >> >> >> group by order_date >> >> >> >> ); >> >> >> >> >> >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN >> as >> >> BIGINT)) >> >> >> >> from >> >> >> >> ( >> >> >> >> select order_hour as rowkey,sum(amount) as saleN, >> >> >> >> >> >> >> >> from order_source >> >> >> >> group by order_hour >> >> >> >> ); >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 >> >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink >> >> 2 >> >> >> >> >> >> 本意是想通过view order_source >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? >> >> >> >> >>
Re:Re: FLINK SQL view的数据复用问题
@ godfrey thanks。刚试了一下,source -> Deduplicate -> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 在 2020-08-04 17:26:02,"godfrey he" 写道: >blink planner支持将多sink的query优化成尽量复用重复计算部分。 >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 > >kandy.wang 于2020年8月4日周二 下午5:20写道: > >> FLINK SQL view相关问题: >> create view order_source >> >> as >> >> select order_id, order_goods_id, user_id,... >> >> from ( >> >> .. proctime,row_number() over(partition by order_id, >> order_goods_id order by proctime desc) as rownum >> >> from hive.temp_dw.dm_trd_order_goods/*+ >> OPTIONS('properties.group.id'='flink_etl_kafka_hbase', >> 'scan.startup.mode'='latest-offset') */ >> >> ) where rownum = 1 and price > 0; >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as >> BIGINT),) >> >> from >> >> ( >> >> select order_date as rowkey, >> >> sum(amount) as saleN, >> >> from order_source >> >> group by order_date >> >> ); >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as >> BIGINT)) >> >> from >> >> ( >> >> select order_hour as rowkey,sum(amount) as saleN, >> >> >> >> from order_source >> >> group by order_hour >> >> ); >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink >> 2 >> >> >> 本意是想通过view order_source >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? >> >>
FLINK SQL view的数据复用问题
FLINK SQL view相关问题: create view order_source as select order_id, order_goods_id, user_id,... from ( .. proctime,row_number() over(partition by order_id, order_goods_id order by proctime desc) as rownum from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 'scan.startup.mode'='latest-offset') */ ) where rownum = 1 and price > 0; insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT),) from ( select order_date as rowkey, sum(amount) as saleN, from order_source group by order_date ); insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT)) from ( select order_hour as rowkey,sum(amount) as saleN, from order_source group by order_hour ); 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink 2 本意是想通过view order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
flink 1.11 streaming 写入hive 5min表相关问题
现象: CREATE TABLE test.xxx_5min ( .. ) PARTITIONED BY (dt string , hm string) stored as orc TBLPROPERTIES( 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='5 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.check-interval' ='30s', 'sink.rolling-policy.rollover-interval'='5min' ); dt = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd') hm = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区, ,checkpoint频率:30s 问题: 1.flink 1.11 steaming写入为什么是1min产生一个文件,而且文件大小没有到128M,如果参数sink.rolling-policy.rollover-interval'='5min 文件滚动时间 5min 滚动大小128M生效的话,就不应该产生这么小的问题,文件大小没有按照预期控制,为啥? 2.小文件问题该如何解决?有什么好的思路 3. 标记文件_Success文件为啥上报延迟? 如果是 12:30的分区,5min的分区,理论上应该12:35左右的时候就应该提交partition?
Flink SQL 解析复杂(嵌套)JSON的问题 以及写入到hive类型映射问题
json格式,如果是一个json array 该如何定义 schema,array里还可能存在嵌套json array的情况。 如数据: {"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx","user_id222":"0022"},{"name333":"name","user_id222":"user"},{"cc":"xxx333","user_id444":"user","name444":"name"}]} 参照:https://www.cnblogs.com/Springmoon-venn/p/12664547.html 需要schema这样定义: user_info 定义成:ROW jsonArray 定义成 : ARRAY> 问题是: 如果json array 里还有一个array 也是继续嵌套定义吗? 这个数据是要写入到hive,该怎么映射,array ,怎么映射成Hive类型,比如映射成array,这种情况的json该如何处理? 有没有什么办法直接把json array,直接映射成array,试了一下发现不行,该如何处理这种复杂类型。