Re: flink sql cli 读取 hbase表报错

2020-06-28 文章 Leonard Xu
Hello,
这应该是一个已知bug[1],原因是Configuration是不可序列化的,HbaseRowInputFormat中没有正确处理,导致用户DDL中的zk配置无法传递。
在flink1.11和1.12上已经修复。如果是1.10.x版本中,可以将HBase 的配置文件(hbase-default.xml、 
hbase-site.xml) 添加到 classpath下,也可以把
HBase 
的配置文件添加到HADOOP_CLASSPATH(flnk启动脚本会检查HADOOP_CLASSPATH环境变量并加载),两种方式Flink集群和SQL 
Client都能加载到Hbase的配置文件,从而加载到配置文件中正确的zk信息。


祝好,
Leonard
[1]  https://issues.apache.org/jira/browse/FLINK-17968


> 在 2020年6月29日,11:45,王良  写道:
> 
> 您好:
> 
> 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表
> 
> CREATE TABLE dim_term (
>term_id string,
>info ROW(
>term_name string,
>term_name_combine string,
>term_notice string,
>term_remarks string,
>season string,
>term_sequence string,
>term_start_time string,
>term_end_time string,
>term_description string,
>term_status int,
>is_mvp_term int,
>ctime string,
>utime string
>)
> 
> ) WITH (
> 'connector.type' = 'hbase',
> 'connector.version' = '1.4.3',
> 'connector.table-name' = 'dim_term',
> 'connector.zookeeper.quorum' = 
> 'emr-header-1.cluster-109533:2181,emr-worker-1.cluster-109533:2181,emr-header-2.cluster-109533:2181',
> 'connector.zookeeper.znode.parent' = '/hbase'
> )
> 
> 遇到的问题是,当我在sql-client ,执行select * from dim_term 的时候报错
> 
> 2020-06-29 11:26:51,718 INFO  
> org.apache.flink.addons.hbase.HBaseRowInputFormat 
> org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:65)
>  - Initializing HBase configuration.
> 2020-06-29 11:26:51,831 INFO  
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.(RecoverableZooKeeper.java:120)
>  - Process identifier=hconnection-0x57b9485d connecting to ZooKeeper 
> ensemble=localhost:2181
> 
> 
> org.apache.flink.addons.hbase.HBaseRowInputFormat,这个类里面没有读取到zookeeper的配置



Re: 无法生成rowtime导致在window失败

2020-06-28 文章 Leonard Xu
Hi,

> field("logictime","TIMESTAMP(3)”)
 报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field 
logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL 
加计算列完成满足你的需求,参考[1]

create table test (
 acct STRING,
 evitime BIGINT,
 logictime as TO_TIMESTAMP(FROM_UNIXTIME(evitime)),
 WATERMARK FOR logictime AS logictime - INTERVAL ‘5’ SECOND,
) with(
...
)
 
   
祝好
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html
 


Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu

> 在 2020年6月29日,12:05,sunfulin  写道:
> 
> 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?

Hi,
在1.10.x 版本中,upsertSink 中推导 pk 是通过query 来推导,这个比较好的解决是等1.11发布后,通过在建表的DDL声明主键( 
PRIMARY KEY NOT ENFORCED), 如果要在1.10.x里解决,一般是改写下query,使得推导的pk能符合预期。这个写入es的sink要求 
pk 是简单类型,而你的query又需要ROW(c, d) 复合类型, 不太好改写。想到hack一点的方式就是把c,d 
拼接成一个字段c${delimeter}d,ROW(c, d)  用UDF构造,感觉这种也比较绕。如果业务上不是强需求ROW(c, 
d),又等不及1.11的话,可以在ES里多加一列就好了。

祝好,
Leonard Xu

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, Leonard
这个写法应该是OK,不过我的场景下是下面这种
select a, b, row(commentId, commentContent) from T
group by a, b, commentId, commentContent
这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?











在 2020-06-29 10:19:31,"Leonard Xu"  写道:
>Hi,
>异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
>ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
>UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
>字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
>,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。
>
>你可以试下下面的query,query keys  对应es中的 id  就是 
>commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 
>Select ROW(commentId, commentContent) from T
>group by commentId, commentContent
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月28日,22:33,sunfulin  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> hi, 
>> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
>> 
>> 
>> org.apache.flink.table.api.ValidationException: Only simple types that can 
>> be safely converted into a string representation can be used as keys. But 
>> was: Row(commentId: String, commentContent: String)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at 
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>>> Hi,
>>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>>> 能附上异常栈就更好啦。
>>> 
>>> sunfulin  于2020年6月25日周四 下午4:35写道:
>>> 
 Hi,
 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Best,
>>> Benchao Li


flink sql cli 读取 hbase表报错

2020-06-28 文章 王良
您好:

 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表

CREATE TABLE dim_term (
term_id string,
info ROW(
term_name string,
term_name_combine string,
term_notice string,
term_remarks string,
season string,
term_sequence string,
term_start_time string,
term_end_time string,
term_description string,
term_status int,
is_mvp_term int,
ctime string,
utime string
)

) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = 'dim_term',
'connector.zookeeper.quorum' = 
'emr-header-1.cluster-109533:2181,emr-worker-1.cluster-109533:2181,emr-header-2.cluster-109533:2181',
'connector.zookeeper.znode.parent' = '/hbase'
)

遇到的问题是,当我在sql-client ,执行select * from dim_term 的时候报错

2020-06-29 11:26:51,718 INFO  org.apache.flink.addons.hbase.HBaseRowInputFormat 

org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:65)
 - Initializing HBase configuration.
2020-06-29 11:26:51,831 INFO  
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.(RecoverableZooKeeper.java:120)
 - Process identifier=hconnection-0x57b9485d connecting to ZooKeeper 
ensemble=localhost:2181


org.apache.flink.addons.hbase.HBaseRowInputFormat,这个类里面没有读取到zookeeper的配置
 

flink读取kafka超时问题

2020-06-28 文章 阿华田
Caused by: java.lang.Exception: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dercd_seeme-3 could be determined  
大佬们flink读取kafka遇到过这个错误没?现在情况是 
每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时?


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



无法生成rowtime导致在window失败

2020-06-28 文章 naturalfree
大家好
   在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下!
   
简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception
 in thread "main" org.apache.flink.table.api.ValidationException: A group 
window expects a time attribute for grouping in a stream environment.
 
   下边是我的详细流程的相关片段

1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11
2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819}
3. 使用descriptor的方式连接kafka,代码为:
StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);


fsTableEnv.connect(new Kafka()
  .version("universal")
  .topic("jes_topic_evtime")
  .property("zookeeper.connect", 
"172.xx.xx.xxx:2181")
  .property("bootstrap.servers", 
"172.xx.xx.xxx:9092")
  .property("group.id", "grp1")
  .startFromEarliest()
).withFormat(new Json()
  
.failOnMissingField(false).deriveSchema())
   .withSchema(new 
Schema().field("acct", "STRING").field("evtime", 
"LONG").field("logictime","TIMESTAMP(3)").rowTime(new 
Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000)))
 
.inAppendMode().createTemporaryTable("testTableName");


   Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime 
FROM testTableName")
   
.window(Tumble.over("5.seconds").on("logictime").as("w1"))
   .groupBy("w1, acct")
   .select("w1.rowtime, acctno");




测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null

Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu
Hi,
异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。

你可以试下下面的query,query keys  对应es中的 id  就是 commentId${keyDelimiter}commentContent, 
这也应该是你需要的结果 
Select ROW(commentId, commentContent) from T
group by commentId, commentContent

祝好,
Leonard Xu

> 在 2020年6月28日,22:33,sunfulin  写道:
> 
> 
> 
> 
> 
> 
> 
> hi, 
> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
> 
> 
> org.apache.flink.table.api.ValidationException: Only simple types that can be 
> safely converted into a string representation can be used as keys. But was: 
> Row(commentId: String, commentContent: String)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at 
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>> Hi,
>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>> 能附上异常栈就更好啦。
>> 
>> sunfulin  于2020年6月25日周四 下午4:35写道:
>> 
>>> Hi,
>>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>> 
>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li



Re: flink sql 中值为null时结果都为 false

2020-06-28 文章 Leonard Xu
Hello
更新下,社区这个 issue(FLINK-18164 
)和 Benchao 
讨论后关闭了,因为当前Flink在处理 null 的行为是正确的,所以建议处理 null 时,都用 IS NULL , IS NOT NULL 
先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。

祝好,
Leonard Xu

> 在 2020年6月7日,17:22,Leonard Xu  写道:
> 
> Hi, 
> Flink 用Calcite做sql解析和优化, 这是个 bool 的二值逻辑和三值逻辑处理问题,calcite默认在 where clause[2] 
> 处理时 是用UNKNOWN_AS_FALSE mode, 这个结果是符合预期的, 类似的还有"x IS TRUE","JOIN ... ON x", 
> "HAVING x。
> 
> [1] 
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L1016
>  
> 
> [2] 
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L3462
>  
> 
> 
> Best,
> Leonard Xu
> 
>> 在 2020年6月6日,11:45,Benchao Li > > 写道:
>> 
>> 哇,非常赞!
>> 我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
>> 而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。
>> 
>> 所以如果执行`SELECT null <>
>> null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
>> 而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。
>> 
>> 而`IS [NOT] DISTINCT FROM`就是专门用来处理对null值的比较的场景的。因为它可以处理null,所以它的返回值
>> 只会是true或者false,而不会是unknown。对于你这个场景来说,应该是最合适的。
>> 
>> PS:回复邮件的时候,记得“回复全部”,这样我们的讨论社区里的小伙伴们都可以看到并且受益~
>> 
>> [1] https://modern-sql.com/concept/three-valued-logic 
>> 
>> [2] https://modern-sql.com/feature/is-distinct-from 
>> 
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html#comparison-functions
>>  
>> 
>> 
>> whirly  于2020年6月6日周六 上午10:42写道:
>> 
>>> Hi.
>>> 我刚刚找到了解决方法了,flink sql builtin functions 中其实提供了另外的逻辑运算符 IS DISTINCT FROM
>>> 可以解决这个问题,
>>> IS DISTINCT FROM 也是不等于,相对于 <> ,Null IS DISTINCT FROM someValue 的结果是 True。
>>> 
>>> best,
>>> whirly
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-06-06 00:59:12,"Benchao Li"  写道:
>>> 
>>> Hi,
>>> 
>>> 我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-18164
>>> 
>>> whirly  于2020年6月5日周五 下午11:20写道:
>>> 
 好的,可能只是我的邮件客户端显示的问题,感谢回复。
 
 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is
 not null,难度也很大,且容易出错。
 
 如果能有一个配置项控制 null <> 'someValue' 结果为true就好了
 
 
 
 
 whirly
 邮箱:whir...@163.com
 
 
 
 签名由 网易邮箱大师  定制
 
 在2020年06月05日 23:08,Benchao Li  写道:
 Hi,
 
 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
 
 whirly  于2020年6月5日周五 下午9:54写道:
 
> 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-05 14:25:10,"whirly"  写道:
>> 大家好:
>>   在 flink sql 中,如 SELECT * from order where  product <>
 'rubber',如果数据中的
> product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于
 'rubber'
>>   只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
>>   但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
> 的判断,可以怎么办呢?
>> 
>> 
>> 感谢
> 
 
 
 --
 
 Best,
 Benchao Li
 
 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>>> 
>>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li
> 



Re: 【Flink SQL对于NULL在不等时候的处理】

2020-06-28 文章 Leonard Xu

> 社区之前有个issue[1]在跟进这个问题, 在此之前建议处理 null 时,都用 IS NULL , IS NOT NULL 
> 先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。
> 
> 祝好,
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-18164 
> 
> 

更新下,社区这个 issue(FLINK-18164 
)和 Benchao 
讨论后关闭了,因为当前Flink在处理 null 的行为是正确的,所以建议处理 null 时,都用 IS NULL , IS NOT NULL 
先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。

祝好,
Leonard Xu

?????? ??Flink Sql ????????????????????????????

2020-06-28 文章 ????????
Hi,


??c1,c2,c3??c4
alter tablec4c1,c4,c2,c3??
??.






----
??:"Jark Wu"

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, 
谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner


org.apache.flink.table.api.ValidationException: Only simple types that can be 
safely converted into a string representation can be used as keys. But was: 
Row(commentId: String, commentContent: String)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)















在 2020-06-28 10:15:34,"Benchao Li"  写道:
>Hi,
>我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>能附上异常栈就更好啦。
>
>sunfulin  于2020年6月25日周四 下午4:35写道:
>
>> Hi,
>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>
>
>
>-- 
>
>Best,
>Benchao Li


?????? ??????????

2020-06-28 文章 cs

??yarnyarn??
yarn??standalone??standalone??flink??,
yarnspark??MR??





----
??:"LakeShen"

Re: 高可用集群

2020-06-28 文章 LakeShen
Hi 李军,

目前我们在 Yarn 上面的话,用的是 Flink On Yarn Per Job 模式,在 K8s 上面的话,就是 Standalone per
Job 模式。

Best,
LakeShen

刘佳炜  于2020年6月28日周日 下午5:14写道:

> 如果你公司用hadoop的话就是YARN StandAlone一般都是单机测试练习的
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 李军  发送时间: 2020年6月28日 17:11
> 收件人: user-zh  主题: 回复:高可用集群
>
>
>
> 
> 请教下,各位大佬们生产环境使用的是哪种集群配置
>  1.
> Standalone 集群
>  2. Yarn
> 集群
> 
>
>  理由是什么,不知道怎么选择
>
>
> 2020-6-28
> | |
> 李军
> |
> |
> hold_li...@163.com
> |
> 签名由网易邮箱大师定制


关于注册定时器的一些疑问

2020-06-28 文章 Jun Zhang
大家好:
 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream
接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。


回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 文章 夏帅
你好,这个问题从异常来看是使用TupleTypeInfo导致的,可以试下使用GenericRecordAvroTypeInfo


--
发件人:yingbo yang 
发送时间:2020年6月28日(星期日) 17:38
收件人:user-zh 
主 题:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

Hi:
在使用 ParquetAvroWriters.forGenericRecord(Schema schema)
写parquet文件的时候 出现 类转化异常:
下面是我的代码:

// //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(GenericData.Record.class,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
 DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo);
 testDataStream.print().setParallelism(1);
ArrayList fields = new
ArrayList();
 fields.add(new org.apache.avro.Schema.Field("id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"id", JsonProperties.NULL_VALUE));
 fields.add(new org.apache.avro.Schema.Field("time",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"time", JsonProperties.NULL_VALUE));
 org.apache.avro.Schema parquetSinkSchema =
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields);
 String fileSinkPath = "./xxx.text/rs6/";
StreamingFileSink parquetSink = StreamingFileSink.
 forBulkFormat(new Path(fileSinkPath),
 ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .build();
 testDataStream.addSink(parquetSink).setParallelism(1);
 flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");


下面是异常:

09:29:50,283 INFO  org.apache.flink.runtime.taskmanager.Task
  - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)
switched from RUNNING to FAILED.09:29:50,283 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING
to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:697)

at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)

at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)

at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)

at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)09:29:50,284

INFO  org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Sink: Unnamed (1/1)
(79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO
org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Sink: Unnamed
(1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor-
Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Unnamed (1/1)
79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)

switched from RUNNING to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord at

flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 文章 yingbo yang
Hi:
在使用 ParquetAvroWriters.forGenericRecord(Schema schema)
写parquet文件的时候 出现 类转化异常:
下面是我的代码:

// //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(GenericData.Record.class,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
 DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo);
 testDataStream.print().setParallelism(1);
ArrayList fields = new
ArrayList();
 fields.add(new org.apache.avro.Schema.Field("id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"id", JsonProperties.NULL_VALUE));
 fields.add(new org.apache.avro.Schema.Field("time",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"time", JsonProperties.NULL_VALUE));
 org.apache.avro.Schema parquetSinkSchema =
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields);
 String fileSinkPath = "./xxx.text/rs6/";
StreamingFileSink parquetSink = StreamingFileSink.
 forBulkFormat(new Path(fileSinkPath),
 ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .build();
 testDataStream.addSink(parquetSink).setParallelism(1);
 flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");


下面是异常:

09:29:50,283 INFO  org.apache.flink.runtime.taskmanager.Task
  - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)
switched from RUNNING to FAILED.09:29:50,283 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING
to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:697)

at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)

at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)

at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)

at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)09:29:50,284

INFO  org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Sink: Unnamed (1/1)
(79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO
org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Sink: Unnamed
(1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor-
Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Unnamed (1/1)
79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)

switched from RUNNING to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord at
org.apache.avro.generic.GenericData.getField(GenericData.java:697) at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)


请问是否是使用方式不对?还是什么问题?


回复:高可用集群

2020-06-28 文章 刘佳炜
如果你公司用hadoop的话就是YARN StandAlone一般都是单机测试练习的



发自我的iPhone


-- 原始邮件 --
发件人: 李军 

高可用集群

2020-06-28 文章 李军
   请教下,各位大佬们生产环境使用的是哪种集群配置
1. Standalone 集群
2. Yarn 集群

理由是什么,不知道怎么选择


2020-6-28
| |
李军
|
|
hold_li...@163.com
|
签名由网易邮箱大师定制

Re: Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-28 文章 Paul Lam
Hi,

其实 HA 元数据没有自动清理是老问题了,可能要等到 ZK HA 的逻辑重构之后才可以解决,具体可以参考以下两个 ticket [1][2]。

不过即使 Flink 实现了自动清理,也没有办法处理外部原因导致作业退出而留下的元数据,所以还是要用户自己实现检测和清理的机制。

1. https://issues.apache.org/jira/browse/FLINK-6522 

2. https://issues.apache.org/jira/browse/FLINK-10333 


Best,
Paul Lam

> 2020年6月28日 12:29,于汝国  写道:
> 
> 
> 
> 
> flink本身不提供cancel 
> job后清理zookeeper上残留znode的功能或机制,包括hdfs上的部分数据,如果想清除的话,可手动操作或者自实现。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-28 09:12:41,"林恬"  写道:
>> 各位好:
>>   目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>> 节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>> 
>> 
>> 



Re: Re: 【Flink在sink端的Exactly once语义】

2020-06-28 文章 Jingsong Li
Hi,

补充Benchao的观点:
- 除了kafka以外,还有StreamingFileSink也是exactly-once不多不少的。
- 对于Mysql、ES,这种支持主键更新的,在upsert语义下(比如一个count(*) from t group
by),数据是最终一致的。所以我理解数据也是不多不少的exactly once语义。

Best,
Jingsong Lee

On Mon, Jun 22, 2020 at 11:46 AM 程龙 <13162790...@163.com> wrote:

> 需要自己实现比如幂等操作 比如通过表示为操作
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-22 10:04:43,"Benchao Li"  写道:
> >看起来现在只有Kafka实现了TwoPhaseCommitSinkFunction,所以目前应该也只有Kafka支持exactly once。
> >
> >不过像Mysql、ES这种,可以根据主键来更新的,只要能做到at least once应该就可以了。
> >
> >忝忝向仧 <153488...@qq.com> 于2020年6月21日周日 下午11:27写道:
> >
> >> Hi,all:
> >>
> >>
> >> Flink连接器这块,如果是sink到mysql,ES等,有对应的实现exactly once语义么?
> >> 比如kafka的连接有sink的exactly once语义,sink时候指定即可.
> >> 那么,如果是mysql后者其他的有么?
> >> 谢谢.
> >> return new FlinkKafkaProducer011<(
> >> "topic",
> >> new KeyedSerializationSchemaWrapper<(new
> SimpleStringSchema()),
> >> producerProperties,
> >> FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
> >
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 
Best, Jingsong Lee


Re: flinksql

2020-06-28 文章 Jingsong Li
Hi,

在1.11之前,注意:flink sql-client只能创建flink的表而不是hive的表。
如果你用create table t (i int, j
int);的这个一个简短的语句,是不能创建出flink表来的。完整的Flink表需要with参数。[1]

在1.11中支持的hive dialect,才支持用create table t (i int, j int);这种简单的DDL创建Hive表。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

Best,
Jingsong Lee

On Wed, Jun 24, 2020 at 8:44 PM Rui Li  wrote:

> 是要通过Flink SQL
>
> client创建一张hive表么?在1.10里可能需要给表的参数加上'is_generic'='false'才行,否则创建出来的表默认是非hive表,hive那边是读不了的
>
> On Mon, Jun 22, 2020 at 3:53 PM Leonard Xu  wrote:
>
> > Hi,
> > 这个报错通常是缺少了 connector 相关的jar包,或者 connector 的with参数填写错误。
> > > flink sql-client下建的表
> > 这是什么表,可以把建表 SQL 发出来看看吗?
> >
> > Best,
> > Leonard Xu
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-28 文章 Tianwang Li
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
我增加每个task处理窗口数据的时间在观察一下,
我这个是测试任务,没有sink输出。
source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右)

LakeShen  于2020年6月28日周日 上午10:35写道:

> Hi Tianwang Li,
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
> Best,
> LakeShen
>
> zhisheng  于2020年6月28日周日 上午10:27写道:
>
> > hi, Tianwang Li
> >
> > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
> >
> > > 任务经常会出现反压(特别是在窗口输出的时候)
> >
> > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
> >
> >
> > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
> >
> > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
> >
> > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
> >
> >
> > Best !
> > zhisheng
> >
> >
> > Tianwang Li  于2020年6月28日周日 上午10:17写道:
> >
> > > 关于Flink checkpoint偶尔会比较长时间的问题。
> > >
> > > *环境与背景:*
> > > 版本:flink1.10.0
> > > 数据量:每秒约10万左右的记录,数据源是kafka
> > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> > >
> > > *问题:*
> > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> > checkpoint消耗时间比较长。
> > >
> > > checkpoint情况大致如下:
> > >
> > > [image: image.png]
> > > [image: image.png]
> > > [image: image.png]
> > >
> > > 2020-06-24 21:09:53,369 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> > >
> > > 2020-06-24 21:09:58,327 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:09:59,266 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:10:08,346 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:10:09,286 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > >
> > > 省略
> > >
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:55:39,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > >