Re: Flink SQL Count Distinct performance optimization

2020-01-07 文章
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin  于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
> SELECT
>
>  aggId,
>
>  pageId,
>
>  statkey,
>
>  COUNT(DISTINCT deviceId) as cnt
>
>  FROM
>
>  (
>
>  SELECT
>
>  'ZL_005' as aggId,
>
>  'ZL_UV_PER_MINUTE' as pageId,
>
>  deviceId,
>
>  ts2Date(recvTime) as statkey
>
>  from
>
>  kafka_zl_etrack_event_stream
>
>  )
>
>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best


Re: 注册table时catalog无法变更

2020-01-07 文章
hi,

streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
注册的表都是Temporary Table。

你可以通过:
catalog = new InMemoryExternalCatalog(catalogName);
streamTableEnvironment.registerCatalog(catalogName, catalog);
catalog.createTable()

或者
streamTableEnvironment.getCatalog().get().createTable()

的方式来注册表到指定的catalog


xiyu...@163.com  于2020年1月7日周二 下午3:20写道:

> hi,各位:
>
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> streamTableEnvironment.registerDataStream(tableName, dataStream,
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
>   streamTableEnvironment.registerCatalog(catalogName, new
> InMemoryExternalCatalog(catalogName));
> streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
>
>
> xiyu...@163.com
>


回复:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.

2019-11-22 文章 (晓令)
hi 李军, 
java的URI地址不允许带 _ , https://bugs.openjdk.java.net/browse/JDK-8221675
请将 _ 改掉后再试试

thanks,
godfrey


--
发件人:李军 
发送时间:2019年11月22日(星期五) 15:15
收件人:贺小令(晓令) 
主 题:回复:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.

  
flink:1.9.1 
hadoop:2.8.3
 


 
李军  

hold_li...@163.com   
签名由 网易邮箱大师 定制  
在2019年11月22日 15:14,贺小令(晓令) 写道:   
请问你用的flink版本是?

thanks,
godfrey

--
发件人:李军 
发送时间:2019年11月22日(星期五) 11:10
收件人:user-zh 
主 题:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.

报错图片和详细内容如下
https://blog.csdn.net/qq_37518574/article/details/103197224
另外启动这个之前要启动哪些,yarn 和hdfs 都已经启动了;
初学,虚心请教,感谢。



回复:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.

2019-11-21 文章 (晓令)
请问你用的flink版本是?

thanks,
godfrey


--
发件人:李军 
发送时间:2019年11月22日(星期五) 11:10
收件人:user-zh 
主 题:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.

报错图片和详细内容如下
https://blog.csdn.net/qq_37518574/article/details/103197224
另外启动这个之前要启动哪些,yarn 和hdfs 都已经启动了;
初学,虚心请教,感谢。



回复:DML去重,translate时报错

2019-11-21 文章 (晓令)
hi 叶贤勋:

你的SQL里先 ORDER BY proc desc ,然后取  rownum = 1, 等价于 last row 的逻辑。此时会产生 
retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。

如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first row 
的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。 

但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的 
timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue  来 fix 这个问题。

thanks,
godfrey


--
发件人:JingsongLee 
发送时间:2019年11月21日(星期四) 18:44
收件人:user-zh ; Jark Wu ; godfrey he 
(JIRA) 
主 题:Re: DML去重,translate时报错

Hi 叶贤勋:

现在去重现在支持insert into select 语法。
问题在于你的这个SQL怎么没产出UniqueKey
这里面可能有blink-planner的bug。
CC: @Jark Wu @godfrey he (JIRA)

Best,
Jingsong Lee


--
From:叶贤勋 
Send Time:2019年11月21日(星期四) 16:20
To:user-zh@flink.apache.org 
Subject:DML去重,translate时报错

Hi 大家好:
Flink版本1.9.0,
SQL1:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
SQL2:

CREATE TABLE user_dist (
dt VARCHAR,
user_id VARCHAR,
behavior VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'user_behavior_dup',
'connector.username' = 'root',
'connector.password' = ‘**',
'connector.write.flush.max-rows' = '1'
);
SQL3:

INSERT INTO user_dist
SELECT
  dt,
  user_id,
  behavior
FROM (
   SELECT
  dt,
  user_id,
  behavior,
 ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) 
AS rownum
   FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as 
dt,user_id,behavior,PROCTIME() as proc
from user_log) )
WHERE rownum = 1;


在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)


请问去重现在不支持insert into select 语法吗?


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制