Re: 滑动窗口数据存储多份问题

2020-07-05 文章 Congxian Qiu
Hi
现在的实现是这样的,每条数据会在每个窗口中存一份

Best,
Congxian


张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:

> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制


?????? ??????FLINKSQL1.10????????????UV

2020-07-05 文章 x
sorry,group agg.
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??.


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <35907...@qq.com ??2020??7??6?? 11:15??

 ??1.10.1??sinkwindow??count
 distinct??window??count
 
distinct??windowgroupnbsp;DATE_FORMAT(rowtm,
 '-MM-dd') sql??
 val rt_totaluv_view : Table = tabEnv.sqlQuery(
 """
 SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd 
HH:mm:00'))
 time_str,COUNT(DISTINCT userkey) uv
 FROM source
 GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
 """)
 tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

 val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 .filter( line =gt; line._1 == true ).map( line =gt; 
line._2 )

 val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

 tabEnv.sqlUpdate(
 s"""
 INSERT INTO mysql_totaluv
 SELECT _1,MAX(_2)
 FROM $totaluvTabTmp
 GROUP BY _1
 """)
 --nbsp;nbsp;--
 ??:nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

 x <35907...@qq.comgt; ??2020??7??3?? 4:34??

 gt; 
checkpoint??
 gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; x 
<35907...@qq.comamp;amp;amp;gt;
 ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CREATE VIEW 
uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
SELECTamp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;nbsp;,
 gt; amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; 
HH:mm:00'))amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; AS
 time_str,amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 COUNT(DISTINCT user_id) OVER
 gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; FROM 
user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; WINDOW w AS 
(ORDER BY proctime
 ROWS BETWEEN
 gt; UNBOUNDED
 gt; amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PARTITION BY
 DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
PS??1.10??DDL??CREATE
 VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

flink cep result DataStream no data print

2020-07-05 文章 Zhou Zach
code:


val inpurtDS = 
streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val 
pattern = Pattern.begin[BehaviorInfo]("start")
  .where(_.clickCount  7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
  new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() {
override def processMatch(
   matchPattern: util.Map[String, 
util.List[BehaviorInfo]],
   ctx: PatternProcessFunction.Context,
   out: Collector[BehaviorInfo]): Unit = {
  try {
println(
  s"""
 |matchPattern: $matchPattern
 |util.List[BehaviorInfo]: ${matchPattern.get("start")}
 |""".stripMargin)
out.collect(matchPattern.get("start").get(0))
  } catch {
case exception: Exception =
  println(exception)
  }
}
  })
result.print()



??inpurtDS.print()pattern??result.print()PatternProcessFunctionprocessMatch??


Thanks a lot!

Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-05 文章 Benchao Li
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <35907...@qq.com> 于2020年7月6日周一 上午11:15写道:

> 版本是1.10.1,最后sink的时候确实是一个window里面做count
> distinct操作。请问是只要计算过程中含有一个window里面做count
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupDATE_FORMAT(rowtm,
> '-MM-dd') 这个sql对应的状态很大。代码如下:
> val rt_totaluv_view : Table = tabEnv.sqlQuery(
>   """
> SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM source
> GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
> """)
> tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
>
> val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>   .filter( line = line._1 == true ).map( line = line._2 )
>
> val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
>
> tabEnv.sqlUpdate(
>   s"""
> INSERT INTO mysql_totaluv
> SELECT _1,MAX(_2)
> FROM $totaluvTabTmp
> GROUP BY _1
> """)
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年7月3日(星期五) 晚上9:47
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
> 这个已经在1.11中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17942
>
> x <35907...@qq.com 于2020年7月3日周五 下午4:34写道:
>
>  您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
> 
> 
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jark Wu"  发送时间:nbsp;2020年6月18日(星期四) 中午12:16
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  是的,我觉得这样子是能绕过的。
> 
>  On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comgt; wrote:
> 
>  gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  gt; val resTmpTab: Table = tabEnv.sqlQuery(
>  gt;nbsp;nbsp; """
>  gt;nbsp;nbsp;nbsp;nbsp; SELECT
> MAX(DATE_FORMAT(ts, '-MM-dd
>  HH:mm:00'))
>  gt; time_str,COUNT(DISTINCT userkey) uv
>  gt;nbsp;nbsp;nbsp;nbsp; FROM
> user_behaviornbsp;nbsp;nbsp; GROUP BY
>  DATE_FORMAT(ts, '-MM-dd')nbsp;nbsp;nbsp; """)
>  gt;
>  gt; val
> resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>  gt;nbsp;nbsp;
>  .filter(line=amp;gt;line._1==true).map(line=amp;gt;line._2)
>  gt;
>  gt; val res= tabEnv.fromDataStream(resTmpStream)
>  gt; tabEnv.sqlUpdate(
>  gt;nbsp;nbsp; s"""
>  gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO
> rt_totaluv
>  gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
>  gt;nbsp;nbsp;nbsp;nbsp; FROM $res
>  gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
>  gt;nbsp;nbsp;nbsp;nbsp; """)
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Jark Wu"  gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午1:55
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; 在 Flink 1.11 中,你可以尝试这样:
>  gt;
>  gt; CREATE TABLE mysql (
>  gt; amp;nbsp;amp;nbsp; time_str STRING,
>  gt; amp;nbsp;amp;nbsp; uv BIGINT,
>  gt; amp;nbsp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
>  gt; ) WITH (
>  gt; amp;nbsp;amp;nbsp; 'connector' = 'jdbc',
>  gt; amp;nbsp;amp;nbsp; 'url' =
> 'jdbc:mysql://localhost:3306/mydatabase',
>  gt; amp;nbsp;amp;nbsp; 'table-name' = 'myuv'
>  gt; );
>  gt;
>  gt; INSERT INTO mysql
>  gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')),
>  COUNT(DISTINCTamp;nbsp;
>  gt; user_id)
>  gt; FROM user_behavior;
>  gt;
>  gt; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comamp;gt;
> wrote:
>  gt;
>  gt; amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  gt; amp;gt; sink表这个样式
>  gt; amp;gt; tm uv
>  gt; amp;gt; 2020/06/17 13:46:00 1
>  gt; amp;gt; 2020/06/17 13:47:00 2
>  gt; amp;gt; 2020/06/17 13:48:00 3
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; group by 日期的话,分钟如何获取
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:amp;amp;nbsp;"Benchao Li"<
> libenc...@apache.org
>  amp;amp;gt;;
>  gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
>  gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>  amp;amp;gt;;
>  gt; amp;gt;
>  gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; Hi,
>  gt; amp;gt; 我感觉这种场景可以有两种方式,
>  gt; amp;gt; 1. 可以直接用group by + mini batch
>  gt; amp;gt; 2. window聚合 + fast emit
>  gt; amp;gt;
>  gt; amp;gt; 对于#1,group
> by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
>  '-MM-dd')。
>  gt; amp;gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1]
> 。同时,mini
>  batch的开启也需要
>  gt; amp;gt; 用参数[2] 来打开。
>  gt; amp;gt;
>  gt; amp;gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
>  gt; amp;gt; fast
>  emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
>  gt; amp;gt; table.exec.emit.early-fire.enabled = true
>  gt; amp;gt; table.exec.emit.early-fire.delay = 60 s
>  gt; amp;gt;

滑动窗口数据存储多份问题

2020-07-05 文章 张浩
Hi,all!
由于第一次咨询,我不确定上一份邮件大家是否收到。
想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
份?


| |
张浩
|
|
13669299...@163.com
|
签名由网易邮箱大师定制

回复:【Flink的shuffle mode】

2020-07-05 文章 夏帅
你好,可以参考下ExecutionConfigOptions,OptimizerConfigOptions和GlobalConfiguration,里面有比较清楚地介绍




--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月6日(星期一) 12:16
收件人:user-zh 
主 题:回复:【Flink的shuffle mode】

那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?



发自我的iPhone


-- 原始邮件 --
发件人: Jingsong Li 

Re: 【Flink的shuffle mode】

2020-07-05 文章 Jingsong Li
pipeline:直接走网络传输,不buffer所有数据
batch:buffer所有数据,结束后一起发送

流一定是pipeline
批可以是pipeline(更好的性能),也可以是batch(更好的容错和更简单的资源申请)

Best,
Jingsong

On Mon, Jul 6, 2020 at 12:16 PM 忝忝向仧 <153488...@qq.com> wrote:

>
> 那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: Jingsong Li  发送时间: 2020年7月6日 11:03
> 收件人: user-zh  主题: 回复:【Flink的shuffle mode】
>
>
>
> Hi,
>
> 现在就两种:pipeline和batch
>
> batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。
>
> 理论上可以per transformation的来设置,see PartitionTransformation.
>
> Best,
> Jingsong
>
> On Sun, Jul 5, 2020 at 10:48 PM 忝忝向仧 <153488...@qq.com wrote:
>
>  Hi,all:
> 
> 
>  看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle
>  mode看是UNDEFINED的。
>  那么,shuffle mode有哪些方式?在应用里面可以设置么?
> 
> 
>  谢谢.
>
>
>
> --
> Best, Jingsong Lee



-- 
Best, Jingsong Lee


回复:【Flink的shuffle mode】

2020-07-05 文章 忝忝向仧
那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?



发自我的iPhone


-- 原始邮件 --
发件人: Jingsong Li 

flink sql??????????????????

2020-07-05 文章 op
??sql?? select day,
 count(id),
 sum(v1) from
(
select
 day ,
 id ,
 sum(v1) v1 from source
   group by day,
 id
)t


group by day



tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))


??id??14??checkpoint
??1.10.0

flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空

2020-07-05 文章 Jim Chen
Hi,
可以通过以下步骤还原车祸现场:
kafka topic: test_action
kafka message:
  {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码Problem2.java:
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
"'format.derive-schema' = 'false',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem");
}
}


?????? ??????FLINKSQL1.10????????????UV

2020-07-05 文章 x
??1.10.1??sinkwindow??count 
distinct??window??count 
distinct??windowgroupDATE_FORMAT(rowtm,
 '-MM-dd') sql??
val rt_totaluv_view : Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) 
time_str,COUNT(DISTINCT userkey) uv
FROM source
GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
""")
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
  .filter( line = line._1 == true ).map( line = line._2 )

val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

tabEnv.sqlUpdate(
  s"""
INSERT INTO mysql_totaluv
SELECT _1,MAX(_2)
FROM $totaluvTabTmp
GROUP BY _1
""")
----
??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

x <35907...@qq.com ??2020??7??3?? 4:34??

 
checkpoint??

 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




 --nbsp;nbsp;--
 ??:nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; [2]
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt;
 gt; amp;gt; x <35907...@qq.comamp;amp;gt; 
??2020??6??17?? 11:14??
 gt; amp;gt;
 gt; amp;gt; amp;amp;gt;
 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; CREATE VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; SELECTamp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp;
 MAX(DATE_FORMAT(proctimeamp;amp;amp;nbsp;,
 gt; '-MM-dd
 gt; amp;gt; HH:mm:00'))amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; AS time_str,amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; 
COUNT(DISTINCT user_id) OVER
 w AS uv
 gt; amp;gt; amp;amp;gt; FROM user_behavior
 gt; amp;gt; amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS 
BETWEEN
 UNBOUNDED
 gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, 
'-MM-dd')
 gt; ??
 gt; amp;gt; amp;amp;gt; 
PS??1.10??DDL??CREATE VIEW??
 gt; amp;gt; amp;amp;gt; 



-- 

Best,
Benchao Li

Re: 【Flink Join内存问题】

2020-07-05 文章 admin
regular join确实是这样,所以量大的话可以用interval join 、temporal join

> 2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com> 写道:
> 
> Hi,all:
> 
> 我看源码里写到JoinedStreams:
> 也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
> 那么有什么预防措施呢?
> 将key值多的一边进行打散?
> 
> 
> Right now, the join is being evaluated in memory so you need to ensure that 
> the number
> * of elements per key does not get too high. Otherwise the JVM might crash.



Re: 【Flink的shuffle mode】

2020-07-05 文章 Jingsong Li
Hi,

现在就两种:pipeline和batch

batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。

理论上可以per transformation的来设置,see PartitionTransformation.

Best,
Jingsong

On Sun, Jul 5, 2020 at 10:48 PM 忝忝向仧 <153488...@qq.com> wrote:

> Hi,all:
>
>
> 看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle
> mode看是UNDEFINED的。
> 那么,shuffle mode有哪些方式?在应用里面可以设置么?
>
>
> 谢谢.



-- 
Best, Jingsong Lee


flink 1.11 作业执行异常

2020-07-05 文章 sunfulin
Hi,
我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
org.apache.flink.table.api.TableExecution: Failed to execute sql


caused by : java.lang.IlleagalStateException: No ExecutorFactory found to 
execute the application.
  at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)


想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。

Re: flink1.10在通过TableFunction实现行转列时,Row一直是空

2020-07-05 文章 Jim Chen
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING,
那么,现在的问题,就是查询出来,都是空。

基于上面的代码环境,新写了一个类
Problem2.java

package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
"'format.derive-schema' = 'false',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem");
}
}

Jark Wu  于2020年7月6日周一 上午10:36写道:

> Hi,
>
> 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Mon, 6 Jul 2020 at 10:19, Jim Chen  wrote:
>
> > 大家好:
> > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> > 那么在eval方法接收到的就是Row[],
> > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >
> > 通过下面的步骤和代码可还原车祸场景:
> > kafka topic: test_action
> > kafka message:
> > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >
> > 代码1:Problem.java
> > package com.flink;
> >
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > /**
> >  *
> >  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> >  * 那么在eval方法接收到的就是Row[],
> >  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >  *
> >  * kafka topic: test_action
> >  *
> >  * kafka message:
> >  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >  */
> > public class Problem {
> >
> > public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> > envSettings);
> > bsEnv.registerFunction("explode2", new ExplodeFunction());
> >
> > String ddlSource = "CREATE TABLE actionTable (\n" +
> > "action ARRAY<\n" +
> > "   ROW<" +
> > "   actionID STRING,\n" +
> > "   actionName STRING\n" +
> > "   >\n" +
> > "   >\n" +
> > ") WITH (\n" +
> > "'connector.type' = 'kafka',\n" +
> > "'connector.version' = '0.11',\n" +
> > "'connector.topic' = 'test_action',\n" +
> > "'connector.startup-mode' = 'earliest-offset',\n" 

Re: flink1.10在通过TableFunction实现行转列时,Row一直是空

2020-07-05 文章 Jark Wu
Hi,

当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
https://issues.apache.org/jira/browse/FLINK-17855


Best,
Jark

On Mon, 6 Jul 2020 at 10:19, Jim Chen  wrote:

> 大家好:
> 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> 那么在eval方法接收到的就是Row[],
> 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>
> 通过下面的步骤和代码可还原车祸场景:
> kafka topic: test_action
> kafka message:
> {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码1:Problem.java
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  *
>  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
>  * 那么在eval方法接收到的就是Row[],
>  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>  *
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> bsEnv.registerFunction("explode2", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable (\n" +
> "action ARRAY<\n" +
> "   ROW<" +
> "   actionID STRING,\n" +
> "   actionName STRING\n" +
> "   >\n" +
> "   >\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',\n" +
> "'connector.version' = '0.11',\n" +
> "'connector.topic' = 'test_action',\n" +
> "'connector.startup-mode' = 'earliest-offset',\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> "'update-mode' = 'append',\n" +
> "'format.type' = 'json'\n" +
> ")";
> bsEnv.sqlUpdate(ddlSource);
>
> //Table table = bsEnv.sqlQuery("select `action` from actionTable");
> Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
> TABLE(explode2(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print("==tb==");
>
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>
> 代码2:ExplodeFunction.java
> package com.flink;
>
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
> import java.util.ArrayList;
> import java.util.Arrays;
>
> public class ExplodeFunction extends TableFunction {
>
> public void eval(Row[] values) {
> System.out.println(values.length);
> if (values.length > 0) {
> for (Row row : values) {
> if (row != null) {// 这里debug出来的row总是空
> ArrayList list = new ArrayList<>();
> for (int i = 0; i < row.getArity(); i++) {
> Object field = row.getField(i);
> list.add(field);
> }
>
> collector.collect(Row.of(Arrays.toString(list.toArray(;
> }
> }
> }
> }
> }
>
> 最后贴个debug的图
> [image: image.png]
>


flink1.10在通过TableFunction实现行转列时,Row一直是空

2020-07-05 文章 Jim Chen
大家好:
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码1:Problem.java
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode2", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable (\n" +
"action ARRAY<\n" +
"   ROW<" +
"   actionID STRING,\n" +
"   actionName STRING\n" +
"   >\n" +
"   >\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json'\n" +
")";
bsEnv.sqlUpdate(ddlSource);

//Table table = bsEnv.sqlQuery("select `action` from actionTable");
Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
TABLE(explode2(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print("==tb==");


bsEnv.execute("ARRAY tableFunction Problem");
}
}

代码2:ExplodeFunction.java
package com.flink;

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;

public class ExplodeFunction extends TableFunction {

public void eval(Row[] values) {
System.out.println(values.length);
if (values.length > 0) {
for (Row row : values) {
if (row != null) {// 这里debug出来的row总是空
ArrayList list = new ArrayList<>();
for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
list.add(field);
}

collector.collect(Row.of(Arrays.toString(list.toArray(;
}
}
}
}
}

最后贴个debug的图
[image: image.png]


Re: 在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-05 文章 john
Mark一下,我现在也有这样的需求

> 2020年7月4日 下午12:35,zhisheng  写道:
> 
> 我猜你是想要将 table name 作为一个标签方便后期分组查询过滤?
> 
> wangl...@geekplus.com.cn  于2020年7月3日周五 上午10:24写道:
> 
>> public void invoke(ObjectNode node, Context context) throws Exception {
>> 
>>String tableName = node.get("metadata").get("topic").asText();
>>Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
>> new MeterView(10));
>>meter.markEvent();
>>log.info("### counter: " + meter.toString() + "\t" +
>> meter.getCount());
>> 
>> 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
>> 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。
>> 
>> 谢谢,
>> 王磊
>> 
>> 
>> wangl...@geekplus.com.cn
>> 
>> 
>> Sender: kcz
>> Send Time: 2020-07-03 09:13
>> Receiver: wanglei2
>> Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>> 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧
>> 
>> 
>> 
>> -- 原始邮件 --
>> 发件人: 王磊2 
>> 发送时间: 2020年7月2日 21:46
>> 收件人: user-zh , 17610775726 <17610775...@163.com>
>> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>> 
>> 
>> 没有明白你说的实现方式。
>> 
>> 我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ...,
>> myCounter_tableX
>> 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。
>> 
>> 谢谢,
>> 王磊
>> 
>> 
>> 
>> --
>> 发件人:JasonLee <17610775...@163.com>
>> 发送时间:2020年7月2日(星期四) 21:12
>> 收件人:user-zh 
>> 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>> 
>> 你把tablename传到下面metric里不就行了吗
>> 
>> 
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>> 
>> Signature is customized by Netease Mail Master
>> 
>> 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:
>> 
>> 全都是同一种类型的 metrics.
>> 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的
>> metrics(但都是 meter 类型)
>> 
>> 谢谢,
>> 王磊
>> 
>> 
>> 
>> 
>> wangl...@geekplus.com.cn
>> 
>> 
>> 发件人: JasonLee
>> 发送时间: 2020-07-02 16:16
>> 收件人: user-zh
>> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>> 是要生成不同类型的metric吗 比如counter meter ?
>> 
>> 
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>> 
>> Signature is customized by Netease Mail Master
>> 
>> 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
>> 
>> 官网上的例子:
>> 
>> public class MyMapper extends RichMapFunction {
>> private transient Counter counter;
>> @Override
>> public void open(Configuration config) {
>>  this.counter = getRuntimeContext()
>>.getMetricGroup()
>>.counter("myCounter");
>> }
>> @Override
>> public String map(String value) throws Exception {
>>  this.counter.inc();
>>  return value;
>> }
>> }
>> 
>> 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
>> 
>> 谢谢,
>> 王磊
>> 
>> 
>> 
>> wangl...@geekplus.com.cn
>> 
>> 
>> 



答复: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-05 文章 陈凯

Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
我之前提了个jira 描述了这个问题 
https://issues.apache.org/jira/browse/FLINK-18196

修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19



-邮件原件-
发件人: zhisheng  
发送时间: 2020年7月5日 15:01
收件人: user-zh 
主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian

Best!
zhisheng

Congxian Qiu  于2020年7月4日周六 下午3:21写道:

> @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>
> Best,
> Congxian
>
>
> zhisheng  于2020年7月4日周六 下午12:27写道:
>
> > 我们也有遇到过这个异常,但是不是很常见
> >
> > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >
> > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> > > Best,
> > > Congxian
> > >
> > >
> > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > > > >
> > > > >
> > > > >| |
> > > > >JasonLee
> > > > >|
> > > > >|
> > > > >邮箱:17610775...@163.com
> > > > >|
> > > > >
> > > > >Signature is customized by Netease Mail Master
> > > > >
> > > > >在2020年07月01日 20:43,程龙 写道:
> > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > > > >
> > > > >
> > > > >java.lang.Exception: Could not perform checkpoint 3201 for operator
> > > > Filter -> Map (2/8).
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > > > >   at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > > > >   at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > > > >   at java.lang.Thread.run(Thread.java:745)
> > > > >Caused by: java.lang.NullPointerException
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> > > >
> > >
> >
>


??Flink??shuffle mode??

2020-07-05 文章 ????????
Hi,all:


??Flink??keyBy??transformationsshuffle 
mode??shuffle modeUNDEFINED
??shuffle mode


.

??Flink Join??????????

2020-07-05 文章 ????????
Hi,all:

??JoinedStreams:
join??stream??key??oom
?
??key


Right now, the join is being evaluated in memory so you need to ensure that the 
number
* of elements per key does not get too high. Otherwise the JVM might crash.

Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-05 文章 zhisheng
生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian

Best!
zhisheng

Congxian Qiu  于2020年7月4日周六 下午3:21写道:

> @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>
> Best,
> Congxian
>
>
> zhisheng  于2020年7月4日周六 下午12:27写道:
>
> > 我们也有遇到过这个异常,但是不是很常见
> >
> > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >
> > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> > > Best,
> > > Congxian
> > >
> > >
> > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > > > >
> > > > >
> > > > >| |
> > > > >JasonLee
> > > > >|
> > > > >|
> > > > >邮箱:17610775...@163.com
> > > > >|
> > > > >
> > > > >Signature is customized by Netease Mail Master
> > > > >
> > > > >在2020年07月01日 20:43,程龙 写道:
> > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > > > >
> > > > >
> > > > >java.lang.Exception: Could not perform checkpoint 3201 for operator
> > > > Filter -> Map (2/8).
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > > > >   at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > > > >   at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > > > >   at java.lang.Thread.run(Thread.java:745)
> > > > >Caused by: java.lang.NullPointerException
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> > > >
> > >
> >
>