hi sunfulin, you can try with blink planner (since 1.9 +), which optimizes distinct aggregation. you can also try to enable *table.optimizer.distinct-agg.split.enabled* if the data is skew.
best, godfreyhe sunfulin <sunfulin0...@163.com> 于2020年1月8日周三 下午3:39写道: > Hi, community, > I'm using Apache Flink SQL to build some of my realtime streaming apps. > With one scenario I'm trying to count(distinct deviceID) over about 100GB > data set in realtime, and aggregate results with sink to ElasticSearch > index. I met a severe performance issue when running my flink job. Wanner > get some help from community. > > > Flink version : 1.8.2 > Running on yarn with 4 yarn slots per task manager. My flink task > parallelism is set to be 10, which is equal to my kafka source partitions. > After running the job, I can observe high backpressure from the flink > dashboard. Any suggestions and kind of help is highly appreciated. > > > running sql is like the following: > > > INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) > > select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as > clkCnt from > > ( > > SELECT > > aggId, > > pageId, > > statkey, > > COUNT(DISTINCT deviceId) as cnt > > FROM > > ( > > SELECT > > 'ZL_005' as aggId, > > 'ZL_UV_PER_MINUTE' as pageId, > > deviceId, > > ts2Date(recvTime) as statkey > > from > > kafka_zl_etrack_event_stream > > ) > > GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024) > > ) as t1 > > group by aggId, pageId, statkey > > > > > > > > > > > > > > > > > Best
