Re: flink是否可以通过代码设置hadoop的配置文件目录

2020-01-08 文章 tison
建议通过 HADOOP_HOME 或 HADOOP_CONF_DIR 环境配置,Flink 有一个 fallback 的加载优先级

1. HADOOP_HOME
2. Configuration 也就是 fs.hdfs.hadoopconf
3. HADOOP_CONF_DIR

其中 Configuration 的方式是已废弃的

Best,
tison.


LJY  于2020年1月9日周四 下午3:52写道:

> 各位好:
>
> 目前hadoop的配置文件是在 fs.hdfs.hadoopconf 设置。
>
> 用户是否能够不启用配置文件中的fs.hdfs.hadoopconf,通过代码手动设置hadoop的目录。


flink是否可以通过代码设置hadoop的配置文件目录

2020-01-08 文章 LJY
各位好:

目前hadoop的配置文件是在 fs.hdfs.hadoopconf 设置。

用户是否能够不启用配置文件中的fs.hdfs.hadoopconf,通过代码手动设置hadoop的目录。

Re:Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb 
with checkpoint off. So I really cannot see any state info from the dashboard. 
I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li"  wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin  于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>>
 Hi, community,
 I'm using Apache Flink SQL to build some of my realtime streaming apps.
 With one scenario I'm trying to count(distinct deviceID) over about 100GB
 data set in realtime, and aggregate results with sink to ElasticSearch
 index. I met a severe performance issue when running my flink job. Wanner
 get some help from community.


 Flink version : 1.8.2
 Running on yarn with 4 yarn slots per task manager. My flink task
 parallelism is set to be 10, which is equal to my kafka source partitions.
 After running the job, I can observe high backpressure from the flink
 dashboard. Any suggestions and kind of help is highly appreciated.


 running sql is like the following:


 INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

 select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
 clkCnt  from

 (

 SELECT

  aggId,

  pageId,

  statkey,

  COUNT(DISTINCT deviceId) as cnt

  FROM

  (

  SELECT

  'ZL_005' as aggId,

  'ZL_UV_PER_MINUTE' as pageId,

  deviceId,

  ts2Date(recvTime) as statkey

  from

  kafka_zl_etrack_event_stream

  )

  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

 ) as t1

 group by aggId, pageId, statkey
















 Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink算子状态查看

2020-01-08 文章 Yun Tang
Hi

没开启Checkpoint但是想知道状态存储的用量的话,对于FsStateBackend来说没有什么好办法;但是对于RocksDBStateBackend来说可以通过开启RocksDB
 native metrics [1] 的方式来观察memtable 以及 sst文件的 size,来近似估算整体状态存储数据量。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics

祝好
唐云

From: sunfulin 
Sent: Wednesday, January 8, 2020 17:43
To: user-zh@flink.apache.org 
Subject: flink算子状态查看

求问怎么通过dashboard查看状态存储量之类的统计?如果没开checkpoint的话


Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 Benchao Li
hi sunfulin,

As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
bound your job.
You can check WindowOperator's latency metric to see how long it tasks to
process an element.
Hope this helps.

sunfulin  于2020年1月8日周三 下午4:04写道:

> Ah, I had checked resource usage and GC from flink dashboard. Seem that
> the reason is not cpu or memory issue. Task heap memory usage is less then
> 30%. Could you kindly tell that how I can see more metrics to help target
> the bottleneck?
> Really appreciated that.
>
>
>
>
>
> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>
> Hi,
>
> Could you try to find out what's the bottleneck of your current job? This
> would leads to
> different optimizations. Such as whether it's CPU bounded, or you have too
> big local
> state thus stuck by too many slow IOs.
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>
>> hi sunfulin,
>> you can try with blink planner (since 1.9 +), which optimizes distinct
>> aggregation. you can also try to enable
>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>
>> best,
>> godfreyhe
>>
>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>
>>> Hi, community,
>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>> index. I met a severe performance issue when running my flink job. Wanner
>>> get some help from community.
>>>
>>>
>>> Flink version : 1.8.2
>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>> After running the job, I can observe high backpressure from the flink
>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>
>>>
>>> running sql is like the following:
>>>
>>>
>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>
>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>> clkCnt  from
>>>
>>> (
>>>
>>> SELECT
>>>
>>>  aggId,
>>>
>>>  pageId,
>>>
>>>  statkey,
>>>
>>>  COUNT(DISTINCT deviceId) as cnt
>>>
>>>  FROM
>>>
>>>  (
>>>
>>>  SELECT
>>>
>>>  'ZL_005' as aggId,
>>>
>>>  'ZL_UV_PER_MINUTE' as pageId,
>>>
>>>  deviceId,
>>>
>>>  ts2Date(recvTime) as statkey
>>>
>>>  from
>>>
>>>  kafka_zl_etrack_event_stream
>>>
>>>  )
>>>
>>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink算子状态查看

2020-01-08 文章 sunfulin
求问怎么通过dashboard查看状态存储量之类的统计?如果没开checkpoint的话

Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 文章 Kurt Young
如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。

Best,
Kurt


On Wed, Jan 8, 2020 at 5:12 PM jun su  wrote:

> 添加代码文字:
>
> def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)
>
> val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
>
> tableEnv.registerTableSource("source",parquetTableSource)
>
>
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
>
> val t4 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t1.toAppendStream[Row].print()
>
> env.execute()
>
> }
>
>
> jun su  于2020年1月8日周三 下午4:59写道:
>
>> 你好:
>>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
>> Pushdown的Bug, 以下是代码和描述:
>>
>> [image: 1578473593933.jpg]
>>
>> debug发现,
>> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
>> true循环一直出不来, 知道整合程序OOM
>>
>> [image: 1.jpg]
>>
>> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
>> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>>
>


Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 文章 jun su
添加代码文字:

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = StreamTableEnvironment.create(env)

val schema =
"{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
val parquetTableSource: ParquetTableSource = ParquetTableSource
.builder
.forParquetSchema(new
org.apache.parquet.avro.AvroSchemaConverter().convert(
org.apache.avro.Schema.parse(schema, true)))
.path("/Users/sujun/Documents/tmp/login_data")
.build

tableEnv.registerTableSource("source",parquetTableSource)


val t1 = tableEnv.sqlQuery("select log_id,city from source where
city = '274' ")
tableEnv.registerTable("t1",t1)

val t4 = tableEnv.sqlQuery("select * from t1 where
log_id='5927070661978133'")
t1.toAppendStream[Row].print()

env.execute()

}


jun su  于2020年1月8日周三 下午4:59写道:

> 你好:
>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
> Pushdown的Bug, 以下是代码和描述:
>
> [image: 1578473593933.jpg]
>
> debug发现,
> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
> true循环一直出不来, 知道整合程序OOM
>
> [image: 1.jpg]
>
> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>


Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin
Ah, I had checked resource usage and GC from flink dashboard. Seem that the 
reason is not cpu or memory issue. Task heap memory usage is less then 30%. 
Could you kindly tell that how I can see more metrics to help target the 
bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young"  wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would 
leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big 
local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 文章 sunfulin


hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 
2 level agg, just as the table.optimizer.distinct-agg.split.enabled param 
worked. Correct me if I am telling the wrong way. But the rewrite sql does not 
work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod 
environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令"  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best