flink hive维表关联报错snappy压缩问题

2021-04-12 Thread kandy.wang
 java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)

at 
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)

at 
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)

at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)

at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1886)

at 
org.apache.hadoop.mapred.SequenceFileRecordReader.(SequenceFileRecordReader.java:49)

at 
org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64)

at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:113)

at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:162)

at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:128)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:103)

at LookupFunction$74.flatMap(Unknown Source)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at StreamExecCalc$71.processElement(Unknown Source)
请问snappy的问题怎么解决?

flink 小文件合并及分区没数据无法提交问题

2021-03-31 Thread kandy.wang
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决




 

flink 小文件合并及分区没数据无法提交问题

2021-03-31 Thread kandy.wang
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决

flink 1.12分支写入hive decimal类型jar冲突

2021-03-04 Thread kandy.wang
flink版本:1.12
hive版本:2.3.4
flink 1.12分支写入hive decimal类型报错:
java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
at 
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99)
at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159)
at 
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
at 
org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$154.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)

这个方法是在hive-exec版本3.1.2版本才有

请问:hive版本 orc版本之间有什么兼容性问题?如何解




flink sql 写入clickhouse性能优化

2021-02-22 Thread kandy.wang
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?

flink升级hadoop3

2021-02-06 Thread kandy.wang
flink 如何升级hadoop3 ?





 

flink升级hadoop3

2021-02-06 Thread kandy.wang
flink 如何升级hadoop3 ?

flink 写hive decimal类型报错

2021-01-20 Thread kandy.wang
java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J

at 
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)

at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99)

at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159)

at 
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)

at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)

at org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58)

at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589)

at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at StreamExecCalc$154.processElement(Unknown Source)
用的是flink-sql-connector-hive-2.3.6_2.11-1.12-SNAPSHOT.jar,公司的Hive也是这个版本,可能是什么原因导致?

window agg early-fire 不生效

2020-12-16 Thread kandy.wang
1min的滚动窗口:
table.exec.emit.early-fire.enabled=true;
table.exec.emit.early-fire.delay=10 s;
设置窗口定期trigger之后,参数不生效
查看执行计划:
 {
  "id": 6,
  "type": "GroupWindowAggregate(groupBy=[mid, code, floor_id], 
window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime], emit=[early delay 1 millisecond])",
  "pact": "Operator",
  "contents": "GroupWindowAggregate(groupBy=[mid, code, floor_id], 
window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[mid, code, floor_id, COUNT(*) AS pv, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime], emit=[early delay 1 millisecond])",
  "parallelism": 72,
  "chaining_strategy": "ALWAYS",
  "uid": "9tx/TSKD9GBbEnuTZOIRSA==",
  "predecessors": [
{
  "id": 4,
  "ship_strategy": "HASH",
  "side": "second"
}
  ]
},
可以确认已经设置成功,就不知道为啥没有定期trigger。
现在看起来像是watermark允许数据延迟设置的5min + 窗口 1min =6min之后才能看到结果

Re:回复: Re: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-14 Thread kandy.wang






@ guoliubin85
感谢回复,和你说的差不多,问题已经搞定。

在 2020-12-14 13:02:54,"guoliubi...@foxmail.com"  写道:
>不好意思我没说清楚。
>我这边用的是这样的SQL可以运作,你可以参考下。
>CREATE TABLE `someTable` (
>  eventTime TIMESTAMP(3),
>  WATERMARK FOR eventTime AS eventTime
>)
>eventTime是java的Long类型,包含毫秒,SQL里可以直接转成TIMESTAMP
>select someFunc(field)
>from `someTable`
>group by TUMBLE(eventTime, INTERVAL '1' SECOND)
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 11:23
>收件人: user-zh
>主题: Re:回复: Window aggregate can only be defined over a time attribute column, 
>but TIMESTAMP(3) encountered.
>hi guoliubin85:
>一样的报错:
> 
>Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' 
>MINUTE) as log_minute,count(1) pv
> 
>> from lightart_expose
> 
>> where code is not null and floor_id is not null
> 
>> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
>> MINUTE);[ERROR] Could not execute SQL statement. Reason:
> 
>org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
>to arguments of type '$TUMBLE(, )'. Supported 
>form(s): '$TUMBLE(, )'
> 
>'$TUMBLE(, , )'
> 
> 
> 
>> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
>> MINUTE);[ERROR] Could not execute SQL statement. Reason:
>org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
>to arguments of type '$TUMBLE(, )'. Supported 
>form(s): '$TUMBLE(, )'
>'$TUMBLE(, , )'
> 
>在 2020-12-14 10:41:12,"guoliubi...@foxmail.com"  写道:
>>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>>
>>
>>
>>guoliubi...@foxmail.com
>> 
>>发件人: kandy.wang
>>发送时间: 2020-12-14 10:28
>>收件人: user-zh
>>主题: Window aggregate can only be defined over a time attribute column, but 
>>TIMESTAMP(3) encountered.
>>[ERROR] Could not execute SQL statement. 
>>Reason:org.apache.flink.table.api.TableException: Window aggregate can only 
>>be defined over a time attribute column, but TIMESTAMP(3) encountered.
>> 
>> 
>>SQL 如下:
>>create temporary view expose as
>> 
>>select  
>> 
>>mid
>> 
>>,time_local
>> 
>>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>>log_ts
>> 
>>,proctime
>> 
>>from hive.temp.kafka_table
>> 
>>;
>>time_local 是bigint
>> 
>> 
>> 
>>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
>> 
>>from expose
>> 
>>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
>> 
>> 
>>window agg的字段报错,如何解决。


Re:回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 Thread kandy.wang
hi guoliubin85:
一样的报错:

Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' 
MINUTE) as log_minute,count(1) pv

> from lightart_expose

> where code is not null and floor_id is not null

> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:

org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'

'$TUMBLE(, , )'



> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'
'$TUMBLE(, , )'

在 2020-12-14 10:41:12,"guoliubi...@foxmail.com"  写道:
>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 10:28
>收件人: user-zh
>主题: Window aggregate can only be defined over a time attribute column, but 
>TIMESTAMP(3) encountered.
>[ERROR] Could not execute SQL statement. 
>Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
>defined over a time attribute column, but TIMESTAMP(3) encountered.
> 
> 
>SQL 如下:
>create temporary view expose as
> 
>select  
> 
>mid
> 
>,time_local
> 
>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>log_ts
> 
>,proctime
> 
>from hive.temp.kafka_table
> 
>;
>time_local 是bigint
> 
> 
> 
>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
> 
>from expose
> 
>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
> 
> 
>window agg的字段报错,如何解决。


Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 Thread kandy.wang
[ERROR] Could not execute SQL statement. 
Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
defined over a time attribute column, but TIMESTAMP(3) encountered.


SQL 如下:
create temporary view expose as

select  

mid

,time_local

,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as log_ts

,proctime

from hive.temp.kafka_table

;
time_local 是bigint



select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv

from expose

group by TUMBLE(log_ts, INTERVAL '1' MINUTE);


window agg的字段报错,如何解决。

Re:flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-02 Thread kandy.wang



JdbcBatchingOutputFormat:
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
  attemptFlush();
batchCount = 0;
  break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
  if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
connection = connectionProvider.reestablishConnection();
jdbcStatementExecutor.closeStatements();
jdbcStatementExecutor.prepareStatements(connection);
}
  } catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and reestablish connection failed.", 
excpetion);
 throw new IOException("Reestablish JDBC connection failed", excpetion);
}
try {
 Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
 Thread.currentThread().interrupt();
 throw new IOException("unable to flush; interrupted while doing 
another attempt", e);
}
   }
}

嗯,看起来是这样的。

 if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}这个判断重试的代码应该放在sleep 后面。不然,Caused by: java.io.IOException: 
java.sql.SQLException: No operations allowed after statement closed. 就没机会重建连接了。



在 2020-12-03 10:36:28,"yanzi"  写道:
>使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。
>针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-30 Thread kandy.wang









@Jianzhi Zhang

嗯,是这个原因,感谢 回复。 就是decimal的精度问题




在 2020-12-01 13:24:23,"Jianzhi Zhang"  写道:
>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang  写道:
>> 
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>   `id` INT UNSIGNED AUTO_INCREMENT,
>>   `spu_id` BIGINT NOT NULL,
>>   `leaving_price`  DECIMAL(10, 5)
>>   PRIMARY KEY ( `id` ),
>>   unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> 
>> 
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>   `spu_id` BIGINT ,
>>   `leaving_price`  DECIMAL(10, 5),
>>PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>  'connector' = 'jdbc',
>>   'url' = 'jdbc:mysql://...',
>>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>   'username' = '...',
>>   'password' = '..'
>> );
>> 
>> 
>> --binlog 2mysql
>> 
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> 
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> 
>> FROM hive.database.table
>> 
>> group by v_spu_id;
>> 
>> 
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> 
>> 
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>> 
>> 
>> 
>> 
>> 


flink cdc 如何保证group agg结果正确性

2020-11-30 Thread kandy.wang
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as 
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM  XX.XX.XX
group by v_spu_id;


XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。

flink cdc 如何保证group agg结果正确性

2020-11-30 Thread kandy.wang
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as 
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM  XX.XX.XX
group by v_spu_id;


XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。

flink 自定义AggregateFunction 如何识别HyperLogLog对象?

2020-11-24 Thread kandy.wang
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ?
就不知道这个TypeInformation该如何写。


代码如下:
import io.airlift.slice.Slices;
import io.airlift.stats.cardinality.HyperLogLog;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Iterator;




public class FlinkUDAFCardinalityEstimationFunction extends 
AggregateFunction {


private static final Logger LOG = 
LoggerFactory.getLogger(JsonArrayParseUDTF.class);


private static final int NUMBER_OF_BUCKETS = 4096;


@Override
public HyperLogLog createAccumulator() {
return HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}


@Override
public Long getValue(HyperLogLog acc) {
if(acc == null){
return 0L;
}
return acc.cardinality();
}


public void accumulate(HyperLogLog acc, String element) {
if(element == null){
return;
}
acc.add(Slices.utf8Slice(element));
}


public void retract(HyperLogLog acc, byte[] element) {
// do nothing
LOG.info("-- retract:" + new String(element));
}


public void merge(HyperLogLog acc, Iterable it) {
Iterator iter = it.iterator();
while (iter.hasNext()) {
HyperLogLog a = iter.next();
if(a != null) {
acc.mergeWith(a);
}
}
}


public void resetAccumulator(HyperLogLog acc) {
acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}
}

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的








在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的。





在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang






1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启




在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启


在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `spu_id` BIGINT NOT NULL,
   `leaving_price`  DECIMAL(10, 5)
   PRIMARY KEY ( `id` ),
   unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8


--flink表
CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
   `spu_id` BIGINT ,
   `leaving_price`  DECIMAL(10, 5),
PRIMARY KEY ( `spu_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://...',
   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
   'username' = '...',
   'password' = '..'
);


--binlog 2mysql

insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg

SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price

FROM hive.database.table

group by v_spu_id;


hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。


问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
有什么好的排查思路么?







Re:关于filesystem connector的一点疑问

2020-11-11 Thread kandy.wang
hi:
按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
delay 时机触发分区提交,得看你的sink.partition-commit.delay
设置的多久,如果超过之后,应当默认是会丢弃的吧。


https://cloud.tencent.com/developer/article/1707182

这个连接可以看一下 







在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>Hi,all
>Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>现在有这样的场景:
>消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>有大佬知道吗,有实际验证过吗
>感谢
>
>附上简单sql:
>CREATE TABLE kafka (
>a STRING,
>b STRING,
>c BIGINT,
>process_time BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) WITH (
>'connector' = 'kafka',
>'topic' = 'topic',
>'properties.bootstrap.servers' = 'x',
>'properties.group.id' = 'test-1',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json',
>'properties.flink.partition-discovery.interval-millis' = '30'
>);
>
>CREATE TABLE filesystem (
>`day` STRING,
>`hour` STRING,
>a STRING,
>b STRING,
>c BIGINT,
>d BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) PARTITIONED BY (`day`, `hour`) WITH (
>'connector' = 'filesystem',
>'format' = 'parquet',
>'path' = 'hdfs://xx',
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file'
>);
>
>insert into filesystem
>select
>from_unixtime(process_time,'-MM-dd') as `day`,
>from_unixtime(process_time,'HH') as `hour`,
>a,
>b,
>c,
>d,
>e,
>f,
>g,
>h,
>i
>from kafka;
>
>
>
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger


flink sql hbase维表关联性能上不去

2020-11-11 Thread kandy.wang
看了一下hbase的维表关联主要是通过org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction
 实现的,测试了一下性能tps只有大概3-4w, 经加本地cache之后性能仍然没有提升。 分析了一下flink ui LookupJoin 是与kafka 
source的算子 chain 在一起了,这样整个算子的并行度就受限于kafka分区的并行度。
1.想问一下这块的 hbase connector开发,是否有做过connector的性能测试。
2.想问一下,hbase维表关联还有没有性能提升手段。感觉这样的性能都达不到上生产应用的要求。



flink sql state queryable ?

2020-10-15 Thread kandy.wang
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 
才可以查询么。
诉求就是想知道state里到底存的啥

group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 Thread kandy.wang
group agg 开启了mini batch之后,state ttl不生效的问题:


现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink 
算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。


sql-client-defaults.yaml对应的参数应该是这2个吧:
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
这个现在进展如何了,这个社区打算什么时候支持





Re:Re: Flink SQL撤回流问题

2020-09-27 Thread kandy.wang






hi 
你建mysql要指定主键,另外创建flink表时也要指定一下主键

PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了








在 2020-09-27 13:36:25,"xiao cai"  写道:
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
>这是我很困惑的地方。
>
>
> 原始邮件 
>发件人: lec ssmi
>收件人: flink-user-cn
>发送时间: 2020年9月27日(周日) 13:06
>主题: Re: Flink SQL撤回流问题
>
>
>是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 
> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > 
>sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > 
>select dt,count(distinct id) from source group by dt; > > > 
>这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
>create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 
>发件人: Michael Ran > 收件人: user-zh 
>> 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 
>2020-09-27 11:48:36,"xiao cai"  写道: >Hi: > >使用Flink 
>SQL撤回流写入MySQL,表的auto_increment > 
>越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


Re:Re: 查询hbase sink结果表,有时查到数据,有时查不到

2020-09-25 Thread kandy.wang
hi Leonard:

实际在HBaseSinkFunction中打log测试下来发现,都是UPDATE_AFTER类型的RowData数据写Hbase,没有你说的那种retract消息呢。如果是retract
 应该是 会先发一条UPDATE_BEFORE 消息,再发一条UPDATE_AFTER消息吧。实际测下来 
都是UPDATE_AFTER,转成了hbase的Put操作,就好比每次都是upsert一样。





在 2020-09-25 10:03:34,"Leonard Xu"  写道:
>Hi
>> 通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete 
>> hbase的某条rowkey数据,导致客户端查不到数据?
>> 我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?
>
>是的,group by 算子会像下游 hbase sink发retract消息,hbase 
>sink处理retract消息的实现就是先delete再insert,所以去查hbase的时候就会碰到你说的有时查不到的情况。
>
>祝好
>Leonard


查询hbase sink结果表,有时查到数据,有时查不到

2020-09-23 Thread kandy.wang
insert into hive.temp_dw.day_order_index select rowkey, ROW(orderN,)
from
(
select order_date as rowkey,
count(distinct parent_sn) as orderN,

group by order_date
)


通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete 
hbase的某条rowkey数据,导致客户端查不到数据?
我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?

flink sql grouping sets语义中NOT NULL不生效

2020-09-23 Thread kandy.wang
sql如下:
select
   (case when act_name is not null then  act_name else 'default_value' 
end) as act_name,
   (case when fst_plat is not null then  fst_plat  else 'default_value' 
end) as fst_plat,
sum(amount) as saleN
from  hive.temp_dw.view_trad_order_goods_source_act_last_value
group by  grouping sets((act_name),(act_name,fst_plat)


 而hive.temp_dw.view_trad_order_goods_source_act_last_value 是一个 view,view的结构如下:
Flink SQL> desc hive.temp_dw.view_trad_order_goods_source_act_last_value
 |-- act_name: STRING
 |-- fst_plat: STRING NOT NULL   
  ..
 其中fst_plat 数据类型为:STRING NOT NULL ,在grouping 
sets的(act_name)条件分支时,fst_plat不参与group,
 实际测下来发现 (case when fst_plat is not null then  fst_plat  else 'default_value' 
end) as fst_plat 输出的结果都没走到 else的default_value
 感觉 fst_plat is not null 不生效。 感觉是和fst_plat: STRING NOT NULL  
这个条件有关系。有这个条件限制时,就破坏了NOT NULL的语义。

flink pb转json性能问题

2020-09-23 Thread kandy.wang
因flink目前不支持pb format,调用了,protobuf-java-util 
com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message)
 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json, 
发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
、社区对pb format 会支持么?
3、pb转json 有什么性能比较好的工具包

Re:Re: Re: Re: StreamingFileWriter 压测性能

2020-09-18 Thread kandy.wang



hi wangenbao :
  我这边还没出现过OOM的情况,我理解调大TM 的资源内存 CPU这些参数应当是可以的。
  我这边遇到的问题是性能上不去。不过table.exec.hive.fallback-mapred-writer=false 确实有较大改观。
在 2020-09-18 16:45:29,"wangenbao" <156827...@qq.com> 写道:
>我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
>write;
>当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM;
>相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题;
>Jingsong Li wrote
>> 是最新的代码吗?
>> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> 它是影响性能的,1.11.2已经投票通过,即将发布
>> 
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang 
>
>> kandy1203@
>
>>  wrote:
>> 
>>> @Jingsong Li
>>>
>>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>>CatalogTable table = checkNotNull(context.getTable());
>>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>>
>>>boolean isGeneric =
>>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>>
>>>if (!isGeneric) {
>>> return new HiveTableSink(
>>> context.getConfiguration().get(
>>>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>>> context.isBounded(),
>>> new JobConf(hiveConf),
>>> context.getObjectIdentifier(),
>>> table);
>>> } else {
>>> return TableFactoryUtil.findAndCreateTableSink(context);
>>> }
>>> }
>>>
>>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>>
>>> If it is false, using flink native writer to write parquet and orc files;
>>>
>>> If it is true, using hadoop mapred record writer to write parquet and orc
>>> files
>>>
>>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>>
>>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>>> 一些相关的参数 ?
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-09-17 11:21:43,"Jingsong Li" 
>
>> jingsonglee0@
>
>>  写道:
>>> >Sink并行度
>>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>>> >
>>> >HDFS性能
>>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>>> >
>>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang 
>
>> kandy1203@
>
>>  wrote:
>>> >
>>> >> 场景很简单,就是kafka2hive
>>> >> --5min入仓Hive
>>> >>
>>> >> INSERT INTO  hive.temp_.hive_5min
>>> >>
>>> >> SELECT
>>> >>
>>> >>  arg_service,
>>> >>
>>> >> time_local
>>> >>
>>> >> .
>>> >>
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>> >>
>>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>>> '='kafka_hive_test',
>>> >> 'scan.startup.mode'='earliest-offset') */;
>>> >>
>>> >>
>>> >>
>>> >> --kafka source表定义
>>> >>
>>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>> >>
>>> >> arg_service string COMMENT 'arg_service',
>>> >>
>>> >> 
>>> >>
>>> >> )WITH (
>>> >>
>>> >>   'connector' = 'kafka',
>>> >>
>>> >>   'topic' = '...',
>>> >>
>>> >>   'properties.bootstrap.servers' = '...',
>>> >>
>>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>>> >>
>>> >>   'scan.startup.mode' = 'group-offsets',
>>> >>
>>> >>   'format' = 'json',
>>> >>
>>> >>   'json.fail-on-missing-field' = 'false',
>>> >>
>>> >>   'json.ignore-parse-errors' = 'true'
>>> >>
>>> >> );
>>> >> --sink hive表定义
>>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>>> >> 
>>> >> )
>>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>>> >>   'sink.partition-commit.trigger'='process-time',
>>> >>   'sink.partition-commit.delay'='0 min',
>>> >>   '

Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 Thread kandy.wang






@Jingsong Li
  测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 
修改应该都提交到flink 1.11分支了吧。
顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?


在 2020-09-17 14:19:42,"Jingsong Li"  写道:
>是的,可以测一下,理论上 mr writer不应该有较大性能差距。
>
>> 为何要强制滚动文件
>
>因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
>
>On Thu, Sep 17, 2020 at 2:05 PM kandy.wang  wrote:
>
>>
>>
>>
>> ok. 就是用hadoop mr writer vs  flink 自实现的native
>> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
>> 改成false是可以满足我们的写hive需求了
>> 还有一个问题,之前问过你,你还没回复:
>> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
>> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
>> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
>> 在 2020-09-17 13:43:04,"Jingsong Li"  写道:
>> >可以再尝试下最新的1.11.2吗?
>> >
>> >https://flink.apache.org/downloads.html
>> >
>> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
>> >
>> >> 是master分支代码
>> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> >> if (userMrWriter) {
>> >>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner,
>> >> rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> >> } else {
>> >>Optional> bulkFactory =
>> >> createBulkWriterFactory(partitionColumns, sd);
>> >>if (bulkFactory.isPresent()) {
>> >>   builder = StreamingFileSink.forBulkFormat(
>> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
>> >> new
>> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
>> partComputer))
>> >> .withBucketAssigner(assigner)
>> >> .withRollingPolicy(rollingPolicy)
>> >> .withOutputFileConfig(outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use native parquet writer.");
>> >> } else {
>> >>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> >> assigner, rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> >> BulkWriter Factory not available.");
>> >> }
>> >> }
>> >> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>> >> >是最新的代码吗?
>> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >> >
>> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>> >> >
>> >> >> @Jingsong Li
>> >> >>
>> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >> >>CatalogTable table = checkNotNull(context.getTable());
>> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >> >>
>> >> >>boolean isGeneric =
>> >> >>
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >> >>
>> >> >>if (!isGeneric) {
>> >> >> return new HiveTableSink(
>> >> >> context.getConfiguration().get(
>> >> >>
>>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> >> context.isBounded(),
>> >> >> new JobConf(hiveConf),
>> >> >> context.getObjectIdentifier(),
>> >> >> table);
>> >> >> } else {
>> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> >> }
>> >> >> }
>> >> >>
>> >> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >> >>
>> >> >> If it is false, using flink native writer to write parquet and orc
>> >> files;
>> >> >>
>> >> >> If it is true, using hadoop mapred record writer to write parquet and
>> >> orc
>> >> >> files
>> >> >>
>> >&

Re:Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 Thread kandy.wang



ok. 就是用hadoop mr writer vs  flink 自实现的native 
writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 
改成false是可以满足我们的写hive需求了
还有一个问题,之前问过你,你还没回复:
HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 
如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 
sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
在 2020-09-17 13:43:04,"Jingsong Li"  写道:
>可以再尝试下最新的1.11.2吗?
>
>https://flink.apache.org/downloads.html
>
>On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:
>
>> 是master分支代码
>> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> if (userMrWriter) {
>>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
>> rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> } else {
>>Optional> bulkFactory =
>> createBulkWriterFactory(partitionColumns, sd);
>>if (bulkFactory.isPresent()) {
>>   builder = StreamingFileSink.forBulkFormat(
>> new org.apache.flink.core.fs.Path(sd.getLocation()),
>> new
>> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
>> .withBucketAssigner(assigner)
>> .withRollingPolicy(rollingPolicy)
>> .withOutputFileConfig(outputFileConfig);
>> LOG.info("Hive streaming sink: Use native parquet writer.");
>> } else {
>>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner, rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> BulkWriter Factory not available.");
>> }
>> }
>> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>> >是最新的代码吗?
>> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >
>> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>> >
>> >> @Jingsong Li
>> >>
>> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >>CatalogTable table = checkNotNull(context.getTable());
>> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >>
>> >>boolean isGeneric =
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >>
>> >>if (!isGeneric) {
>> >> return new HiveTableSink(
>> >> context.getConfiguration().get(
>> >>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> context.isBounded(),
>> >> new JobConf(hiveConf),
>> >> context.getObjectIdentifier(),
>> >> table);
>> >> } else {
>> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> }
>> >> }
>> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >>
>> >> If it is false, using flink native writer to write parquet and orc
>> files;
>> >>
>> >> If it is true, using hadoop mapred record writer to write parquet and
>> orc
>> >> files
>> >>
>> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >>
>> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> 一些相关的参数 ?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>> >> >Sink并行度
>> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >
>> >> >HDFS性能
>> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >
>> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>> >> >
>> >> >> 场景很简单,就是kafka2hive
>> >> >> --5min入仓Hive
>> >> >>
>> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >>
>> >> >> SELECT
>> >> >>
>> >> >>  arg_service,
>> >> >>
>> >> >> time_local
>> >> >>
>> >> >> .
>> >> >>
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >

Re:Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 Thread kandy.wang
是master分支代码
那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 出现的,现在改成false 
就走到else 部分 就暂时没这个问题了
if (userMrWriter) {
   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, 
rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
   Optional> bulkFactory = 
createBulkWriterFactory(partitionColumns, sd);
   if (bulkFactory.isPresent()) {
  builder = StreamingFileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(sd.getLocation()),
new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), 
partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet writer.");
} else {
  builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, 
rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because 
BulkWriter Factory not available.");
}
}
在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>是最新的代码吗?
>1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>它是影响性能的,1.11.2已经投票通过,即将发布
>
>On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>if (!isGeneric) {
>> return new HiveTableSink(
>> context.getConfiguration().get(
>>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>> new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> 
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> 
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='proces

Re:Re: Re: StreamingFileWriter 压测性能

2020-09-16 Thread kandy.wang
@Jingsong Li

public TableSink createTableSink(TableSinkFactory.Context context) {
   CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

   boolean isGeneric = 
Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

   if (!isGeneric) {
return new HiveTableSink(
context.getConfiguration().get(
  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
context.isBounded(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
}

HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。

If it is false, using flink native writer to write parquet and orc files;

If it is true, using hadoop mapred record writer to write parquet and orc files

将此参数调整成false后,同样的资源配置下,tps达到30W

这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush 一些相关的参数 ?





在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>Sink并行度
>我理解是配置Sink并行度,这个一直在讨论,还没结论
>
>HDFS性能
>具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>
>On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>
>> 场景很简单,就是kafka2hive
>> --5min入仓Hive
>>
>> INSERT INTO  hive.temp_.hive_5min
>>
>> SELECT
>>
>>  arg_service,
>>
>> time_local
>>
>> .
>>
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>
>> FROM hive.temp_.kafka_source_pageview/*+ 
>> OPTIONS('properties.group.id'='kafka_hive_test',
>> 'scan.startup.mode'='earliest-offset') */;
>>
>>
>>
>> --kafka source表定义
>>
>> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>
>> arg_service string COMMENT 'arg_service',
>>
>> 
>>
>> )WITH (
>>
>>   'connector' = 'kafka',
>>
>>   'topic' = '...',
>>
>>   'properties.bootstrap.servers' = '...',
>>
>>   'properties.group.id' = 'flink_etl_kafka_hive',
>>
>>   'scan.startup.mode' = 'group-offsets',
>>
>>   'format' = 'json',
>>
>>   'json.fail-on-missing-field' = 'false',
>>
>>   'json.ignore-parse-errors' = 'true'
>>
>> );
>> --sink hive表定义
>> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> 
>> )
>> PARTITIONED BY (dt string , hm string) stored as orc location
>> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>>   'sink.partition-commit.trigger'='process-time',
>>   'sink.partition-commit.delay'='0 min',
>>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>   'sink.rolling-policy.check-interval' ='30s',
>>   'sink.rolling-policy.rollover-interval'='10min',
>>   'sink.rolling-policy.file-size'='128MB'
>> );
>> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> 就是flink sql可以
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> 这块,有没有什么可以提升性能相关的优化参数?
>>
>>
>>
>>
>> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
>> >Hi,
>> >
>> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >
>> >另外,压测时是否可以看下jstack?
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
>> >
>> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> ,source
>> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re:Re: StreamingFileWriter 压测性能

2020-09-16 Thread kandy.wang
场景很简单,就是kafka2hive 
--5min入仓Hive

INSERT INTO  hive.temp_.hive_5min

SELECT

 arg_service,

time_local

.

FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), 
FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区

FROM hive.temp_.kafka_source_pageview/*+ 
OPTIONS('properties.group.id'='kafka_hive_test', 
'scan.startup.mode'='earliest-offset') */;



--kafka source表定义

CREATE TABLE hive.temp_vipflink.kafka_source_pageview (

arg_service string COMMENT 'arg_service',



)WITH (

  'connector' = 'kafka',

  'topic' = '...',

  'properties.bootstrap.servers' = '...',

  'properties.group.id' = 'flink_etl_kafka_hive',

  'scan.startup.mode' = 'group-offsets',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);
--sink hive表定义
CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (

)
PARTITIONED BY (dt string , hm string) stored as orc location 
'hdfs://ssdcluster/._5min' TBLPROPERTIES(
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0 min',
  'sink.partition-commit.policy.class'='...CustomCommitPolicy',
  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
  'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
);
初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 
,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
就是flink sql可以 
改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter 
这块,有没有什么可以提升性能相关的优化参数?




在 2020-09-16 19:29:50,"Jingsong Li"  写道:
>Hi,
>
>可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>
>另外,压测时是否可以看下jstack?
>
>Best,
>Jingsong
>
>On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
>
>> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
>> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>
>
>
>-- 
>Best, Jingsong Lee


Re:Streaming File Sink 不能生成 _SUCCESS 标记文件

2020-09-16 Thread kandy.wang
加上这个参数'sink.partition-commit.policy.kind'='metastore,success-file'

这个应该是可以work的


在 2020-09-16 15:01:35,"highfei2011"  写道:
>Hi,各位好!
>  目前遇到一个问题,在使用 FLink -1.11.0 消费 Kafka 数据后,使用 Streaming File Sink 的 
> BucketAssigner 的分桶策略 sink 到 hdfs ,默认没有生成 _SUCCESS 标记文件。
>  我在配置中新增了 
>val hadoopConf = new Configuration()
>hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true”)   
> 
>
>
>但是输出目录里,还是没有 _SUCCESS 文件,麻烦帮出出主意呢,再次谢谢各位!
>
>
>Best,
>Yang


StreamingFileWriter 压测性能

2020-09-16 Thread kandy.wang
压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source 
writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少

HiveTableSink中关于streaming方式文件回滚策略HiveRollingPolicy疑问

2020-09-15 Thread kandy.wang
private static class HiveRollingPolicy extends CheckpointRollingPolicy {




private final long rollingFileSize;

private final long rollingTimeInterval;




private HiveRollingPolicy(

long rollingFileSize,

long rollingTimeInterval) {

Preconditions.checkArgument(rollingFileSize > 0L);

Preconditions.checkArgument(rollingTimeInterval > 0L);

this.rollingFileSize = rollingFileSize;

this.rollingTimeInterval = rollingTimeInterval;

}




@Override

public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) {

return true;

}




@Override

public boolean shouldRollOnEvent(PartFileInfo partFileState, RowData 
element) {

return false;

}




@Override

public boolean shouldRollOnProcessingTime(

PartFileInfo partFileState, long currentTime) {

try {

return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||

partFileState.getSize() > rollingFileSize;

} catch (IOException e) {

throw new UncheckedIOException(e);

}

}

}
没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了:  
'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
目的是想让文件按照 固定的size 或记录数  或时间 滚动,这样该如何做?

sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 Thread kandy.wang


自实现了kudu connector报错:


2020-09-09 18:34:59,442 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.

org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
statement.

at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262]

at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink 
requires that Table has a full primary keys if it is updated.

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 

Re:Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-14 Thread kandy.wang
@Jingsong  orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景

在 2020-08-12 16:04:13,"Jingsong Li"  写道:
>另外问一下,是什么格式?csv还是parquet。
>有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?
>
>On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
>> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
>> 就是感觉停止之前正在写的那个分区,没有触发commit
>>
>>
>>
>>
>> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
>> >那你之前的分区除了in-progress文件,有已完成的文件吗?
>> >
>> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
>> >
>> >>
>> >>
>> >>
>> >> source就是kafka
>> >>
>> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >>
>> >>
>> >>
>> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >>
>> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>> >> >你的source是exactly-once的source吗?
>> >> >
>> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >> >
>> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> >@ Jingsong
>> >> >>
>> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> >> >> 用presto查询查不了
>> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>> >> >>  'sink.partition-commit.trigger'='process-time',
>> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >>
>>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >> >>   'sink.rolling-policy.check-interval'='30s',
>> >> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >> >>   'sink.rolling-policy.file-size'='128MB'
>> >> >>如果是12:39分 05秒左右做一次savepoint,然后
>> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> >> >> partition,就导致有数据,但是确查不 了。
>> >> >>
>> >>
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> >> >> partition 也能查了。
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >> >> >>
>> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >> >> >>
>> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >> >> >>>
>> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
>> wrote:
>> >> >> >>>
>> >> >> >>> > 1.StreamingFileWriter
>> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >> >> >>> >
>> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >> >> >>>
>> >> >> >>
>> >> >> >>
>> >> >> >>--
>> >> >> >>Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re:Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 Thread kandy.wang






有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, 
重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
就是感觉停止之前正在写的那个分区,没有触发commit




在 2020-08-12 14:26:53,"Jingsong Li"  写道:
>那你之前的分区除了in-progress文件,有已完成的文件吗?
>
>On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
>
>>
>>
>>
>> source就是kafka
>> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>>
>>
>>
>>
>>
>>
>> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>>
>>
>>
>> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>>
>> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>> >你的source是exactly-once的source吗?
>> >
>> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >
>> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> >@ Jingsong
>> >>
>> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> >> 用presto查询查不了
>> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>> >>  'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval'='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >>如果是12:39分 05秒左右做一次savepoint,然后
>> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> >> partition,就导致有数据,但是确查不 了。
>> >>
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> >> partition 也能查了。
>> >> >
>> >> >
>> >> >
>> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >> >>
>> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >> >>
>> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >> >>>
>> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>> >> >>>
>> >> >>> > 1.StreamingFileWriter
>> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >> >>> >
>> >> >>> >
>> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >> >>> >
>> >> >>> >
>> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >> >>>
>> >> >>
>> >> >>
>> >> >>--
>> >> >>Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re:Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 Thread kandy.wang



source就是kafka 
json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。






in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?



in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?

在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>你的source是exactly-once的source吗?
>
>in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
>On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> >@ Jingsong
>>
>> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> 用presto查询查不了
>> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>>  'sink.partition-commit.trigger'='process-time',
>>   'sink.partition-commit.delay'='0 min',
>>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>   'sink.rolling-policy.check-interval'='30s',
>>   'sink.rolling-policy.rollover-interval'='10min',
>>   'sink.rolling-policy.file-size'='128MB'
>>如果是12:39分 05秒左右做一次savepoint,然后
>> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> partition,就导致有数据,但是确查不 了。
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> partition 也能查了。
>> >
>> >
>> >
>> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >>
>> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >>
>> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >>>
>> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>> >>>
>> >>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >>> >
>> >>> >
>> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >>> >
>> >>> >
>> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >>>
>> >>
>> >>
>> >>--
>> >>Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re:Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 Thread kandy.wang


















>@ Jingsong

>导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了
举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
 'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0 min',
  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
  'sink.rolling-policy.check-interval'='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
   如果是12:39分 05秒左右做一次savepoint,然后 
12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add 
partition,就导致有数据,但是确查不 了。 
按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
 partition 也能查了。
>
>
>
>在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>>
>>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>>
>>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>>>
>>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>>>
>>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>>> > =2100分区的数据还存在很多的in-progress文件。
>>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>>> >
>>> >
>>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>>> >
>>> >
>>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>>>
>>
>>
>>-- 
>>Best, Jingsong Lee


Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 Thread kandy.wang






@ Jingsong
导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。 用presto查询查不了




在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>
>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>
>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>>
>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>>
>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> > =2100分区的数据还存在很多的in-progress文件。
>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >
>> >
>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >
>> >
>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>>
>
>
>-- 
>Best, Jingsong Lee


flink 1.11 StreamingFileWriter及sql-client问题

2020-08-11 Thread kandy.wang
1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
   举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 21:04分左右的时候做一次checkpoint 
或savepoint,重启任务的时候,hm =2100分区的数据还存在很多的in-progress文件。 
另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。


2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持


3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持

flink sql csv格式字段分隔符问题

2020-08-07 Thread kandy.wang
 设置 'csv.field-delimiter'='\t'
,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option 
'csv.field-delimiter' must be a string with single character, but was: \t
请问,该怎么搞?

Re:Re: Re: FLINK SQL view的数据复用问题

2020-08-05 Thread kandy.wang






@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。











在 2020-08-04 19:36:56,"godfrey he"  写道:
>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang  于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @ godfrey
>> thanks。刚试了一下,source -> Deduplicate  ->
>> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
>> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>>
>>
>> 在 2020-08-04 17:26:02,"godfrey he"  写道:
>> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
>> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>> >
>> >kandy.wang  于2020年8月4日周二 下午5:20写道:
>> >
>> >> FLINK SQL view相关问题:
>> >> create view order_source
>> >>
>> >> as
>> >>
>> >> select order_id, order_goods_id, user_id,...
>> >>
>> >> from (
>> >>
>> >> ..  proctime,row_number() over(partition by order_id,
>> >> order_goods_id order by proctime desc) as rownum
>> >>
>> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
>> properties.group.id'='flink_etl_kafka_hbase',
>> >> 'scan.startup.mode'='latest-offset') */
>> >>
>> >> ) where  rownum = 1 and  price > 0;
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT),)
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_date as rowkey,
>> >>
>> >> sum(amount) as saleN,
>> >>
>> >> from order_source
>> >>
>> >> group by order_date
>> >>
>> >> );
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT))
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >> select order_hour as rowkey,sum(amount) as saleN,
>> >>
>> >>
>> >>
>> >> from order_source
>> >>
>> >> group by order_hour
>> >>
>> >> );
>> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> >> 2
>> >>
>> >>
>> >> 本意是想通过view  order_source
>> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>> >>
>> >>
>>


Re:Re: FLINK SQL view的数据复用问题

2020-08-04 Thread kandy.wang















@ godfrey
thanks。刚试了一下,source -> Deduplicate  -> 
GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + 
Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
 

在 2020-08-04 17:26:02,"godfrey he"  写道:
>blink planner支持将多sink的query优化成尽量复用重复计算部分。
>1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>
>kandy.wang  于2020年8月4日周二 下午5:20写道:
>
>> FLINK SQL view相关问题:
>> create view order_source
>>
>> as
>>
>> select order_id, order_goods_id, user_id,...
>>
>> from (
>>
>> ..  proctime,row_number() over(partition by order_id,
>> order_goods_id order by proctime desc) as rownum
>>
>> from hive.temp_dw.dm_trd_order_goods/*+ 
>> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
>> 'scan.startup.mode'='latest-offset') */
>>
>> ) where  rownum = 1 and  price > 0;
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT),)
>>
>> from
>>
>> (
>>
>> select order_date as rowkey,
>>
>> sum(amount) as saleN,
>>
>> from order_source
>>
>> group by order_date
>>
>> );
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT))
>>
>> from
>>
>> (
>>
>> select order_hour as rowkey,sum(amount) as saleN,
>>
>>
>>
>> from order_source
>>
>> group by order_hour
>>
>> );
>> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> 2
>>
>>
>> 本意是想通过view  order_source
>> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>>
>>


FLINK SQL view的数据复用问题

2020-08-04 Thread kandy.wang
FLINK SQL view相关问题:
create view order_source

as

select order_id, order_goods_id, user_id,...

from (

..  proctime,row_number() over(partition by order_id, order_goods_id 
order by proctime desc) as rownum

from hive.temp_dw.dm_trd_order_goods/*+ 
OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 
'scan.startup.mode'='latest-offset') */

) where  rownum = 1 and  price > 0; 




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT),)

from

(

select order_date as rowkey,

sum(amount) as saleN,

from order_source

group by order_date

);




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT))

from

(

select order_hour as rowkey,sum(amount) as saleN,



from order_source

group by order_hour

);
问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink  2


本意是想通过view  order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 
,如何做到 ?



flink 1.11 streaming 写入hive 5min表相关问题

2020-07-29 Thread kandy.wang
现象:
CREATE TABLE test.xxx_5min (

..

) PARTITIONED BY (dt string , hm string) stored as orc  TBLPROPERTIES(

  'sink.partition-commit.trigger'='process-time',

  'sink.partition-commit.delay'='5 min',

  'sink.partition-commit.policy.kind'='metastore,success-file',

  'sink.rolling-policy.file-size'='128MB',

  'sink.rolling-policy.check-interval' ='30s',

  'sink.rolling-policy.rollover-interval'='5min'

); 
dt = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd')
hm = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')
5min产生一个分区, ,checkpoint频率:30s
问题:
1.flink 1.11 
steaming写入为什么是1min产生一个文件,而且文件大小没有到128M,如果参数sink.rolling-policy.rollover-interval'='5min
 文件滚动时间 5min 滚动大小128M生效的话,就不应该产生这么小的问题,文件大小没有按照预期控制,为啥?
 2.小文件问题该如何解决?有什么好的思路
3. 标记文件_Success文件为啥上报延迟? 如果是 12:30的分区,5min的分区,理论上应该12:35左右的时候就应该提交partition?

Flink SQL 解析复杂(嵌套)JSON的问题 以及写入到hive类型映射问题

2020-07-28 Thread kandy.wang
json格式,如果是一个json array 该如何定义 schema,array里还可能存在嵌套json array的情况。

如数据:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx","user_id222":"0022"},{"name333":"name","user_id222":"user"},{"cc":"xxx333","user_id444":"user","name444":"name"}]}


参照:https://www.cnblogs.com/Springmoon-venn/p/12664547.html
需要schema这样定义:
user_info 定义成:ROW
jsonArray 定义成 : ARRAY>


问题是:
如果json array 里还有一个array 也是继续嵌套定义吗? 这个数据是要写入到hive,该怎么映射,array  
,怎么映射成Hive类型,比如映射成array,这种情况的json该如何处理? 有没有什么办法直接把json 
array,直接映射成array,试了一下发现不行,该如何处理这种复杂类型。

来自kandy.wang的邮件

2020-07-27 Thread kandy.wang