Re:Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 文章 蔡荣
谢谢回复

















在 2022-07-13 19:14:02,"Yuan Mei"  写道:
>目前Gemini只用在 ververica platform 中作为 aliyun 商业化版本的 default statebackend
>使用,短期还是会以提升 Gemini 更方面性能为主,后续会逐步将主要部分开源的。
>
>Best
>Yuan
>
>On Wed, Jul 13, 2022 at 6:10 PM Hangxiang Yu  wrote:
>
>>
>> 你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。
>> 关于合并到社区,短期内没有相关计划。
>>
>> [1] https://help.aliyun.com/document_detail/421043.html
>> [2] https://help.aliyun.com/document_detail/414255.html
>> [3] https://help.aliyun.com/document_detail/414256.html
>>
>> Best,
>> Hangxiang.
>>
>> On Wed, Jul 13, 2022 at 3:40 PM 蔡荣  wrote:
>>
>> > 看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
>> > https://developer.aliyun.com/article/770793,
>> > 请问是否有计划合并gemini到flink的计划?
>> >
>> >
>>


Re:Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 文章 mack143
退订
在 2022-07-13 19:14:02,"Yuan Mei"  写道:
>目前Gemini只用在 ververica platform 中作为 aliyun 商业化版本的 default statebackend
>使用,短期还是会以提升 Gemini 更方面性能为主,后续会逐步将主要部分开源的。
>
>Best
>Yuan
>
>On Wed, Jul 13, 2022 at 6:10 PM Hangxiang Yu  wrote:
>
>>
>> 你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。
>> 关于合并到社区,短期内没有相关计划。
>>
>> [1] https://help.aliyun.com/document_detail/421043.html
>> [2] https://help.aliyun.com/document_detail/414255.html
>> [3] https://help.aliyun.com/document_detail/414256.html
>>
>> Best,
>> Hangxiang.
>>
>> On Wed, Jul 13, 2022 at 3:40 PM 蔡荣  wrote:
>>
>> > 看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
>> > https://developer.aliyun.com/article/770793,
>> > 请问是否有计划合并gemini到flink的计划?
>> >
>> >
>>


Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 文章 Yuan Mei
目前Gemini只用在 ververica platform 中作为 aliyun 商业化版本的 default statebackend
使用,短期还是会以提升 Gemini 更方面性能为主,后续会逐步将主要部分开源的。

Best
Yuan

On Wed, Jul 13, 2022 at 6:10 PM Hangxiang Yu  wrote:

>
> 你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。
> 关于合并到社区,短期内没有相关计划。
>
> [1] https://help.aliyun.com/document_detail/421043.html
> [2] https://help.aliyun.com/document_detail/414255.html
> [3] https://help.aliyun.com/document_detail/414256.html
>
> Best,
> Hangxiang.
>
> On Wed, Jul 13, 2022 at 3:40 PM 蔡荣  wrote:
>
> > 看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
> > https://developer.aliyun.com/article/770793,
> > 请问是否有计划合并gemini到flink的计划?
> >
> >
>


Re: ​请问是否有计划合并blink gemini到flink的计划

2022-07-13 文章 Hangxiang Yu
你好,是从阿里云产品的Blink迁移到Flink吗?这个迁移过程可以参考[1],Gemini在“实时计算Flink版”中的介绍和使用方式可以参考[2][3]。
关于合并到社区,短期内没有相关计划。

[1] https://help.aliyun.com/document_detail/421043.html
[2] https://help.aliyun.com/document_detail/414255.html
[3] https://help.aliyun.com/document_detail/414256.html

Best,
Hangxiang.

On Wed, Jul 13, 2022 at 3:40 PM 蔡荣  wrote:

> 看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
> https://developer.aliyun.com/article/770793,
> 请问是否有计划合并gemini到flink的计划?
>
>


​请问是否有计划合并blink gemini到flink的计划

2022-07-13 文章 蔡荣
看到一篇文章《数据处理能力相差 2.4 倍?Flink 使用 RocksDB 和 Gemini 的性能对比实验》,
https://developer.aliyun.com/article/770793,
请问是否有计划合并gemini到flink的计划?



Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 





 

Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 

Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健

关于Flink 非blink planer下自持listagg

2021-04-21 文章 张海深


你好,请问Flink 是否支持非blink planer下的 listagg,有计划支持吗。现阶段如果想使用listagg,请问有什么好的方法支持吗



blink planner里的Scala代码,未来会由Java改写吗?

2021-04-01 文章 Luna Wong
目前blink planner中有大量Scala代码,Scala在这方面写起来确实简单不少。未来不需要用Java重写是吗?


Re: Blink Planner构造Remote Env

2020-12-17 文章 莫失莫忘
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink??????planner??blink??????????????????cep sql??a->b

2020-12-08 文章 ??????
flink cep sql blinkPATTERN (e1{3 } - e2{1 }?)??

java.lang.IncompatibleClassChangeError: Implementing class (using blink-planner)

2020-11-16 文章 norman
Issue when integrate with hive 2.1.1

Exception in thread "main" java.lang.IncompatibleClassChangeError:
Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:112)
at
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:48)
at
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:289)
at
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:462)


code is straighforward:
 val bs =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, bs)


tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog)

tEnv.executeSql(
  """SET table.sql-dialect=hive;
|CREATE TABLE wap_nohe_2 (
|  user_id STRING,
|  order_amount DOUBLE
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (
|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
|  'sink.partition-commit.trigger'='partition-time',
|  'sink.partition-commit.delay'='1 h',
|  'sink.partition-commit.policy.kind'='metastore,success-file'
|);
|""".stripMargin)


build.sbt is following:

  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion %
"provided",
 //"org.apache.flink" %% "flink-table-runtime-blink" % flinkVersion %
"provided",
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion %
"provided",
  //"org.apache.flink" %% "flink-table-api-java-bridge" % flinkVersion %
"provided",

  "org.apache.flink" %% "flink-connector-jdbc" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-hive" % flinkVersion % "provided",
  "org.apache.hive" % "hive-exec" % hiveVersion,
  "org.postgresql" % "postgresql" % "42.2.18",

 "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided",




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-15 文章 Danny Chan
DataSet 已经是社区准备 deprecate 的 API 了,不建议再使用。1.12 版本后推荐统一使用 DataStream,使用
sqlQuery 接口拿到 table 对象后转成 DataStream。

Asahi Lee <978466...@qq.com> 于2020年11月13日周五 下午4:05写道:

> BatchTableEnvironment对象可以进行table to dataset; dataset to table
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danny0...@apache.org;
> 发送时间:2020年11月10日(星期二) 下午2:43
> 收件人:"user-zh"
> 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
>
>
>
> 拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。
>
> Asahi Lee <978466...@qq.com 于2020年11月9日周一 下午5:09写道:
>
>  是的,BatchTableEnvironment 对象
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danny0...@apache.orggt;;
>  发送时间:nbsp;2020年11月9日(星期一) 中午12:34
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
> 
> 
> 
>  gt;
>  gt; BatchTableEnvironment 环境
> 
> 
>  是说nbsp; BatchTableEnvironment 对象吗
> 
>  Asahi Lee <978466...@qq.comgt; 于2020年11月9日周一 上午10:48写道:
> 
>  gt; 你好!
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; 我使用的是flink
>  1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
>  gt; // ** // BLINK BATCH QUERY //
> **
>  import
>  gt; org.apache.flink.table.api.EnvironmentSettings; import
>  gt; org.apache.flink.table.api.TableEnvironment;
> EnvironmentSettings
>  bbSettings
>  gt; =
>  gt;
> 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>  gt; TableEnvironment bbTableEnv =
>  gt;
> 
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-15 文章 周虓岗
通过table api的// declare an additional logical field as an event time attribute

Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");


可以把eventtime往后传,
如果使用createview的话怎么把这个time attribute往后带吗?


不往后传的话可能会


这个有什么方法吗?



?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-13 文章 Asahi Lee
BatchTableEnvironmenttable to dataset; dataset to table




----
??: 
   "user-zh"



Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-09 文章 Danny Chan
拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。

Asahi Lee <978466...@qq.com> 于2020年11月9日周一 下午5:09写道:

> 是的,BatchTableEnvironment 对象
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danny0...@apache.org;
> 发送时间:2020年11月9日(星期一) 中午12:34
> 收件人:"user-zh"
> 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
>
>
>
> 
>  BatchTableEnvironment 环境
>
>
> 是说 BatchTableEnvironment 对象吗
>
> Asahi Lee <978466...@qq.com 于2020年11月9日周一 上午10:48写道:
>
>  你好!
>  nbsp; nbsp; nbsp; 我使用的是flink
> 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
>  // ** // BLINK BATCH QUERY // **
> import
>  org.apache.flink.table.api.EnvironmentSettings; import
>  org.apache.flink.table.api.TableEnvironment; EnvironmentSettings
> bbSettings
>  =
> 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>  TableEnvironment bbTableEnv =
> 
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-09 文章 Asahi Lee
??BatchTableEnvironment 




----
??: 
   "user-zh"



Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-08 文章 Danny Chan
>
> BatchTableEnvironment 环境


是说  BatchTableEnvironment 对象吗

Asahi Lee <978466...@qq.com> 于2020年11月9日周一 上午10:48写道:

> 你好!
>我使用的是flink 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
> // ** // BLINK BATCH QUERY // ** import
> org.apache.flink.table.api.EnvironmentSettings; import
> org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings
> =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv =
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-08 文章 Asahi Lee
??
   ??flink 
1.11.2??blink??batch
// ** // BLINK BATCH QUERY // ** import 
org.apache.flink.table.api.EnvironmentSettings; import 
org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); 
TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);??blink??batch??BatchTableEnvironment??

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-26 文章 Tianwang Li
目前,观察到另外一个现象,
如果任务出现了异常,例如写Kafka失败,任务自动重启,这个时候就会突然飙升。
应该是任务失败之后,关闭重启,rocksdb占用到内存没有回收。
通过pmap查看,占用比较多内存多是很多个(128MB 和 64MB 内存块)。

另外,失败重启和如下多jira 描述重启任务多时候比较类似。
https://issues.apache.org/jira/browse/FLINK-7289


pmap图:
[image: image.png]

[image: image.png]

Tianwang Li  于2020年9月23日周三 下午9:11写道:

> 使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size:
> 4g”,
> 并且有预留 1G 用于jvm-overhead。
> 现在超了2.8G,是什么超用的,我想了解一下。
> 如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。
>
>
> 有没有,其他人有这方面的经验。
>
>
>
> Benchao Li  于2020年9月23日周三 下午1:12写道:
>
>> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
>> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
>>
>> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>>
>> >  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
>> > KILL 。
>> > 单流跑的话,比较正常。
>> > JOB的内存是4G。版本1.11.1
>> > --
>> > 发件人:Benchao Li 
>> > 发送时间:2020年9月23日(星期三) 10:50
>> > 收件人:user-zh 
>> > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>> >
>> > Hi Tianwang,
>> >
>> > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>> >
>> > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>> > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>> > `Math.max(leftRelativeSize, rightRelativeSize) +
>> > allowedLateness`,根据你的SQL,这个值应该是6h
>> > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>> > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>> > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
>> > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
>> > 2;`,在你的SQL来讲,就是3h,也就是说
>> > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>> >
>> > 希望这个可以解答你的疑惑~
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-18996
>> >
>> > Tianwang Li  于2020年9月22日周二 下午8:26写道:
>> >
>> > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>> > >
>> > >
>> > > 【join】
>> > >
>> > > > SELECT `b`.`rowtime`,
>> > > > `a`.`c_id`,
>> > > > `b`.`openid`
>> > > > FROM `test_table_a` AS `a`
>> > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
>> > > > AND `a`.`openid` = `b`.`openid`
>> > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
>> > SECOND
>> > > > AND `a`.`rowtime` + INTERVAL '6' HOUR
>> > > >
>> > > >
>> > > 【window】
>> > >
>> > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6'
>> HOUR)
>> > AS
>> > > > `rowtime`,
>> > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>> > > > `__windoow_start__`,
>> > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
>> > > > `__window_end__`,
>> > > > `c_id`,
>> > > > COUNT(`openid`) AS `cnt`
>> > > > FROM `test_table_in_6h`
>> > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
>> > > > `c_id`
>> > > >
>> > >
>> > >
>> > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
>> > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>> > >
>> > > 【配置】
>> > >
>> > > > cat conf/flink-conf.yaml
>> > > > jobmanager.rpc.address: flink-jobmanager
>> > > > taskmanager.numberOfTaskSlots: 1
>> > > > blob.server.port: 6124
>> > > > jobmanager.rpc.port: 6123
>> > > > taskmanager.rpc.port: 6122
>> > > > jobmanager.heap.size: 6144m
>> > > > taskmanager.memory.process.size: 4g
>> > > > taskmanager.memory.jvm-overhead.min: 1024m
>> > > > taskmanager.memory.jvm-overhead.max: 2048m
>> > > > taskmanager.debug.memory.log-interval: 1
>> > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
>> > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
>> > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
>> > > -XX:NumberOfGCLogFiles=10
>> > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > **
>> > >  tivanli
>> > > **
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>> >
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>
> --
> **
>  tivanli
> **
>


-- 
**
 tivanli
**


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 文章 Tianwang Li
使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size:
4g”,
并且有预留 1G 用于jvm-overhead。
现在超了2.8G,是什么超用的,我想了解一下。
如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。


有没有,其他人有这方面的经验。



Benchao Li  于2020年9月23日周三 下午1:12写道:

> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
>
> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>
> >  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> > KILL 。
> > 单流跑的话,比较正常。
> > JOB的内存是4G。版本1.11.1
> > --
> > 发件人:Benchao Li 
> > 发送时间:2020年9月23日(星期三) 10:50
> > 收件人:user-zh 
> > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
> >
> > Hi Tianwang,
> >
> > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
> >
> > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> > `Math.max(leftRelativeSize, rightRelativeSize) +
> > allowedLateness`,根据你的SQL,这个值应该是6h
> > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> > 2;`,在你的SQL来讲,就是3h,也就是说
> > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
> >
> > 希望这个可以解答你的疑惑~
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18996
> >
> > Tianwang Li  于2020年9月22日周二 下午8:26写道:
> >
> > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> > >
> > >
> > > 【join】
> > >
> > > > SELECT `b`.`rowtime`,
> > > > `a`.`c_id`,
> > > > `b`.`openid`
> > > > FROM `test_table_a` AS `a`
> > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > > AND `a`.`openid` = `b`.`openid`
> > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> > SECOND
> > > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > > >
> > > >
> > > 【window】
> > >
> > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> > AS
> > > > `rowtime`,
> > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__windoow_start__`,
> > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__window_end__`,
> > > > `c_id`,
> > > > COUNT(`openid`) AS `cnt`
> > > > FROM `test_table_in_6h`
> > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > > `c_id`
> > > >
> > >
> > >
> > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> > >
> > > 【配置】
> > >
> > > > cat conf/flink-conf.yaml
> > > > jobmanager.rpc.address: flink-jobmanager
> > > > taskmanager.numberOfTaskSlots: 1
> > > > blob.server.port: 6124
> > > > jobmanager.rpc.port: 6123
> > > > taskmanager.rpc.port: 6122
> > > > jobmanager.heap.size: 6144m
> > > > taskmanager.memory.process.size: 4g
> > > > taskmanager.memory.jvm-overhead.min: 1024m
> > > > taskmanager.memory.jvm-overhead.max: 2048m
> > > > taskmanager.debug.memory.log-interval: 1
> > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > > -XX:NumberOfGCLogFiles=10
> > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > > >
> > >
> > >
> > >
> > > --
> > > **
> > >  tivanli
> > > **
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
> >
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 文章 郑斌斌
  谢谢Peidian ,我试一下
--
发件人:Peidian Li 
发送时间:2020年9月23日(星期三) 14:02
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block 
cache超用,我这边的解决办法是增大了taskmanager.memory.jvm-overhead.fraction,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。
郑斌斌  于2020年9月23日周三 下午1:33写道:
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

 java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
 Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
 [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
 [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

  at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 13:12
 收件人:user-zh ; 郑斌斌 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
 郑斌斌  于2020年9月23日周三 下午12:29写道:
  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
  单流跑的话,比较正常。
  JOB的内存是4G。版本1.11.1
  --
  发件人:Benchao Li 
  发送时间:2020年9月23日(星期三) 10:50
  收件人:user-zh 
  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

  Hi Tianwang,

  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
  join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
  `Math.max(leftRelativeSize, rightRelativeSize) +
  allowedLateness`,根据你的SQL,这个值应该是6h
  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
  清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
  数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
  `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
  2;`,在你的SQL来讲,就是3h,也就是说
  状态会*多保存*这么久。这个我曾经建过一个issue来优

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 文章 Peidian Li
我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block cache超用,我这边的解决办法是增大了
taskmanager.memory.jvm-overhead.fraction
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-fraction>
,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size>
,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。

郑斌斌  于2020年9月23日周三 下午1:33写道:

> 谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state,
> checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来
>
> java.lang.Exception: [2020-09-14 09:27:20.431]Container
> [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is
> running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of
> 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0
> -Djobmanager.rpc.address=njdev-nn03.nj
> -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c
> -Djobmanager.rpc.port=30246 -Drest.address=flink-node03
>  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0'
> -Djobmanager.rpc.address='flink-node03'
> -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c'
> -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
> 2>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
>
> [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
> [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143.
>
>  at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> --
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 13:12
> 收件人:user-zh ; 郑斌斌 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
>  单流跑的话,比较正常。
>  JOB的内存是4G。版本1.11.1
>  --
>  发件人:Benchao Li 
>  发送时间:2020年9月23日(星期三) 10:50
>  收件人:user-zh 
>  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
>  Hi Tianwang,
>
>  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
>  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>  join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> 

回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 郑斌斌
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
[2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
[2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

 at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 13:12
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
郑斌斌  于2020年9月23日周三 下午12:29写道:
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
 单流跑的话,比较正常。
 JOB的内存是4G。版本1.11.1
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 10:50
 收件人:user-zh 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 Hi Tianwang,

 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
 `Math.max(leftRelativeSize, rightRelativeSize) +
 allowedLateness`,根据你的SQL,这个值应该是6h
 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
 2;`,在你的SQL来讲,就是3h,也就是说
 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

 希望这个可以解答你的疑惑~

 [1] https://issues.apache.org/jira/browse/FLINK-18996

 Tianwang Li  于2020年9月22日周二 下午8:26写道:

 > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
 >
 >
 > 【join】
 >
 > > SELECT `b`.`rowtime`,
 > > `a`.`c_id`,
 > > `b`.`openid`
 > > FROM `test_table_a` AS `a`
 > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
 > > AND `a`.`openid` = `b`.`openid`
 > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌  于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> --
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
Hi Benchao, 感谢你的回复

我使用的RocksDB,内存 overhead 太多了, 什么地方超用了那么多内存,好像不受控制了。
另外,我也测试过hop窗口,也是超用内存比较。没有使用增量checkpoint。


最后,我这边的interval join 是 inner join ,使用的 b 表的rowtime作为时间,没有观察到延迟数据的情况。

[image: image.png]




Benchao Li  于2020年9月23日周三 上午10:50写道:

> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 郑斌斌
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
单流跑的话,比较正常。
JOB的内存是4G。版本1.11.1
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 10:50
收件人:user-zh 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li



Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li


[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。


【join】

> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> AND `a`.`rowtime` + INTERVAL '6' HOUR
>
>
【window】

> SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `rowtime`,
> HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__windoow_start__`,
> HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> `__window_end__`,
> `c_id`,
> COUNT(`openid`) AS `cnt`
> FROM `test_table_in_6h`
> GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> `c_id`
>


我配置了Fink的内存是4G, 实际使用达到了6.8 G。
同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右

【配置】

> cat conf/flink-conf.yaml
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 6144m
> taskmanager.memory.process.size: 4g
> taskmanager.memory.jvm-overhead.min: 1024m
> taskmanager.memory.jvm-overhead.max: 2048m
> taskmanager.debug.memory.log-interval: 1
> env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
>



-- 
**
 tivanli
**


Re: 多线程模式下使用Blink TableEnvironment

2020-09-18 文章 Jeff Zhang
Hi jun su,

如果是自建平台的话,可以考虑用zeppelin的sdk 来提交作业
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh





jun su  于2020年9月18日周五 上午10:59写道:

> hi godfrey,
>
> 我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
> 再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题
>
> godfrey he  于2020年9月17日周四 下午10:07写道:
>
> > TableEnvironment 不是多线程安全的。
> >
> > btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
> >
> > Jeff Zhang  于2020年9月14日周一 下午12:10写道:
> >
> > > 参考zeppelin的做法,每个线程里都调用这个
> > >
> > >
> > >
> >
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
> > >
> > >
> > > jun su  于2020年9月14日周一 上午11:54写道:
> > >
> > > > hi all,
> > > >
> > > > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > >   at java.util.Objects.requireNonNull(Objects.java:203)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > > >   at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > > > 解决
> > > >
> > > >
> > > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at scala.Predef$.Double2double(Predef.scala:365)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > > > at
> > > >
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > > > Source)
> > > > at
> > > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > > > Source)
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best Regards

Jeff Zhang


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 jun su
hi godfrey,

我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题

godfrey he  于2020年9月17日周四 下午10:07写道:

> TableEnvironment 不是多线程安全的。
>
> btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
>
> Jeff Zhang  于2020年9月14日周一 下午12:10写道:
>
> > 参考zeppelin的做法,每个线程里都调用这个
> >
> >
> >
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
> >
> >
> > jun su  于2020年9月14日周一 上午11:54写道:
> >
> > > hi all,
> > >
> > > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> > >
> > > Caused by: java.lang.NullPointerException
> > >   at java.util.Objects.requireNonNull(Objects.java:203)
> > >   at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >   at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >
> > >
> > >
> > >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > > 解决
> > >
> > >
> > > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> > >
> > > Caused by: java.lang.NullPointerException
> > > at scala.Predef$.Double2double(Predef.scala:365)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > > at
> > >
> > >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > > at
> > >
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > > Source)
> > > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > > Source)
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best,
Jun Su


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 godfrey he
TableEnvironment 不是多线程安全的。

btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?

Jeff Zhang  于2020年9月14日周一 下午12:10写道:

> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su  于2020年9月14日周一 上午11:54写道:
>
> > hi all,
> >
> > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> >
> > Caused by: java.lang.NullPointerException
> >   at java.util.Objects.requireNonNull(Objects.java:203)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >
> >
> >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > 解决
> >
> >
> > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> >
> > Caused by: java.lang.NullPointerException
> > at scala.Predef$.Double2double(Predef.scala:365)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > at
> >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > Source)
> > at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > Source)
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: 多线程模式下使用Blink TableEnvironment

2020-09-13 文章 Jeff Zhang
参考zeppelin的做法,每个线程里都调用这个

https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111


jun su  于2020年9月14日周一 上午11:54写道:

> hi all,
>
> 多线程模式下执行sql , 在非聚合sql时报了如下错误:
>
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>
>
>
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> 解决
>
>
> 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
>
> Caused by: java.lang.NullPointerException
> at scala.Predef$.Double2double(Predef.scala:365)
> at
>
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> at
>
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source)
>
> --
> Best,
> Jun Su
>


-- 
Best Regards

Jeff Zhang


多线程模式下使用Blink TableEnvironment

2020-09-13 文章 jun su
hi all,

多线程模式下执行sql , 在非聚合sql时报了如下错误:

Caused by: java.lang.NullPointerException
  at java.util.Objects.requireNonNull(Objects.java:203)
  at
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
  at
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)


已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
解决


但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?

Caused by: java.lang.NullPointerException
at scala.Predef$.Double2double(Predef.scala:365)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)

-- 
Best,
Jun Su


Re: Blink的Batch模式的并行度问题

2020-07-27 文章 jun su
hi,

如果底层是FileInputFormat ,默认就是1个并行度, 这个参数我尝试了并不起作用,
看代码是创建了一个SingleOutputStreamOperator , 感觉得重写下我使用的OrcInputFormat ,
让他不继承FileInputFormat , 像源码里的HiveInputFormat一样

Caizhi Weng  于2020年7月27日周一 下午5:31写道:

> Hi,
>
> 可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>
> jun su  于2020年7月27日周一 下午3:50写道:
>
> > hi all,
> >
> > Flink 目前的blink table planner batch mode
> > (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
> > 但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
> > 那么如何能扩大并行度来优化性能呢?
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


Re: Blink的Batch模式的并行度问题

2020-07-27 文章 Caizhi Weng
Hi,

可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism

jun su  于2020年7月27日周一 下午3:50写道:

> hi all,
>
> Flink 目前的blink table planner batch mode
> (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
> 但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
> 那么如何能扩大并行度来优化性能呢?
>
> --
> Best,
> Jun Su
>


Blink的Batch模式的并行度问题

2020-07-27 文章 jun su
hi all,

Flink 目前的blink table planner batch mode
(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
那么如何能扩大并行度来优化性能呢?

-- 
Best,
Jun Su


Re: Blink Planner构造Remote Env

2020-07-27 文章 jun su
是依赖问题,解决了

jun su  于2020年7月27日周一 下午2:29写道:

> hi Jark,
>
> 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
> 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
> 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:
>
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:291)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:126)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:453)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2125)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:576)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:562)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:550)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:511)
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:276)
> ... 6 more
>
>
> Jark Wu  于2020年5月20日周三 下午2:30写道:
>
>> Hi,
>>
>> 因为 Blink planner
>> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
>> ExecutionEnvironment。
>> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
>> StreamTableEnvironment,
>> 需要直接去构造 StreamTableEnvironmentImpl:
>>
>> StreamExecutionEnvironment execEnv =
>> StreamExecutionEnvironment.createRemoteEnvironment(...);
>> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
>> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>>
>> Best,
>> Jark
>>
>> On Tue, 19 May 2020 at 15:27, jun su  wrote:
>>
>> > hi all,
>> >
>> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>> >
>> > 官网Blink构建方式是:
>> >
>> > val bbSettings =
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
>> > val bbTableEnv = TableEnvironment.create(bbSettings)
>> >
>> >
>> > 请问如何连接远程集群呢?
>> >
>> > --
>> > Best,
>> > Jun Su
>> >
>>
>
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su


Re: Blink Planner构造Remote Env

2020-07-27 文章 jun su
hi Jark,

抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:291)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:276)
... 6 more


Jark Wu  于2020年5月20日周三 下午2:30写道:

> Hi,
>
> 因为 Blink planner
> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
> ExecutionEnvironment。
> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
> StreamTableEnvironment,
> 需要直接去构造 StreamTableEnvironmentImpl:
>
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.createRemoteEnvironment(...);
> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>
> Best,
> Jark
>
> On Tue, 19 May 2020 at 15:27, jun su  wrote:
>
> > hi all,
> >
> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
> >
> > 官网Blink构建方式是:
> >
> > val bbSettings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> > val bbTableEnv = TableEnvironment.create(bbSettings)
> >
> >
> > 请问如何连接远程集群呢?
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


?????? Blink

2020-06-29 文章 ????
??17610775726??
"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
"org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
"org.apache.flink" % "flink-table" % "1.10.1" % "provided",
"flink-table"??
----
??:"17610775726"<17610775...@163.com;
:2020??6??29??(??) 10:09
??:"user-zh"

Re: Blink

2020-06-29 文章 17610775726
使用row number设置成blink的planner就行了 依赖也只用加blink的




| |
17610775726
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 06/29/2020 17:19, xuhaiLong wrote:


hello,请教下


 "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
 "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
 "org.apache.flink" % "flink-table" % "1.10.1" % "provided",


我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常
`Could not instantiate the executor. Makesure a planner module is on the 
classpath`


而我添加上这个依赖
`"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了


但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗?


猜测是我没有正确引入 blink  ?

Blink

2020-06-29 文章 xuhaiLong


hello,请教下


  "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
  "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
  "org.apache.flink" % "flink-table" % "1.10.1" % "provided",


我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常
`Could not instantiate the executor. Makesure a planner module is on the 
classpath`


而我添加上这个依赖
`"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了


但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗?


猜测是我没有正确引入 blink  ?

Re: Blink Planner构造Remote Env

2020-05-20 文章 Jark Wu
Hi,

因为 Blink planner
不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
ExecutionEnvironment。
Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
StreamTableEnvironment,
需要直接去构造 StreamTableEnvironmentImpl:

StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.createRemoteEnvironment(...);
StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现

Best,
Jark

On Tue, 19 May 2020 at 15:27, jun su  wrote:

> hi all,
>
> 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>
> 官网Blink构建方式是:
>
> val bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
>
> 请问如何连接远程集群呢?
>
> --
> Best,
> Jun Su
>


Blink Planner构造Remote Env

2020-05-19 文章 jun su
hi all,

过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()

官网Blink构建方式是:

val bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)


请问如何连接远程集群呢?

-- 
Best,
Jun Su


Re: Re: flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 文章 刘大龙
Hi,
我觉得你可以多试几种方式,比如先关联再去重,测试一下性能呢?另外,去重的话,你这个业务逻辑是用es字段排序取最新,不能用procetime去重取最新吗?你的sql中rowNum<=1实际上生成的应该是Rank算子,不是Deduplication算子吧。我在业务中对订单类型的数据是这样用去重算子的,select
 * from (select *, row_number() over(partition by id order by proctime desc as 
rowNum from xxx) tmp where rowNum = 1,这样的语法才会转换去Deduplication算子。


> -原始邮件-
> 发件人: "宇张" 
> 发送时间: 2020-05-11 11:40:37 (星期一)
> 收件人: user-zh@flink.apache.org
> 抄送: 
> 主题: Re: flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低
> 
> hi、
> 我这面state backend用的是FsStateBackend,状态保存在hdfs
> 
> On Mon, May 11, 2020 at 11:19 AM Benchao Li  wrote:
> 
> > Hi,
> >
> > 你用的是什么state backend呢?看你的情况很有可能跟这个有关系。比如用的是rocksdb,然后是普通磁盘的话,很容易遇到IO瓶颈。
> >
> > 宇张  于2020年5月11日周一 上午11:14写道:
> >
> > > hi、
> > > 我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
> > > 1、使用row_number函数丢失主键
> > > 2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
> > > // 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
> > > SELECT distinct t1.id as
> > order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> > data.index0.id
> > > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > > rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
> > > t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
> > > SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
> > >
> > >
> > 上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了
> > >
> > > SELECT distinct t1.id as
> > order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > > data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
> > > join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
> > > t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
> > > t1.proctime t4 ON t1.so_id =t4.ID
> > >
> > > SELECT distinct t1.id as
> > order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> > data.index0.id
> > > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > > rowNum<=1) t1
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


Re: flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 文章 宇张
hi、
我这面state backend用的是FsStateBackend,状态保存在hdfs

On Mon, May 11, 2020 at 11:19 AM Benchao Li  wrote:

> Hi,
>
> 你用的是什么state backend呢?看你的情况很有可能跟这个有关系。比如用的是rocksdb,然后是普通磁盘的话,很容易遇到IO瓶颈。
>
> 宇张  于2020年5月11日周一 上午11:14写道:
>
> > hi、
> > 我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
> > 1、使用row_number函数丢失主键
> > 2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
> > // 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> data.index0.id
> > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
> > t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
> > SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
> >
> >
> 上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了
> >
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
> > join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
> > t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
> > t1.proctime t4 ON t1.so_id =t4.ID
> >
> > SELECT distinct t1.id as
> order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> > HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> > data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY
> data.index0.id
> > ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> > rowNum<=1) t1
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 文章 Benchao Li
Hi,

你用的是什么state backend呢?看你的情况很有可能跟这个有关系。比如用的是rocksdb,然后是普通磁盘的话,很容易遇到IO瓶颈。

宇张  于2020年5月11日周一 上午11:14写道:

> hi、
> 我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
> 1、使用row_number函数丢失主键
> 2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
> // 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
> SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
> ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
> t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
> SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
>
> 上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了
>
> SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
> join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
> t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
> t1.proctime t4 ON t1.so_id =t4.ID
>
> SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
> HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
> data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
> ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
> rowNum<=1) t1
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink1.9 Blink sql 丢失主键+去重和时态表联合使用吞吐量低

2020-05-10 文章 宇张
hi、
我这面使用flink1.9的Blink sql完成数据转换操作,但遇到如下问题:
1、使用row_number函数丢失主键
2、row_number函数和时态表关联联合使用程序吞吐量严重降低,对应sql如下:
// 理论上这里面是不需要 distinct的,但sql中的主键blink提取不出来导致校验不通过,所以加了一个
SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
rowNum<=1) t1 left join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF
t1.proctime t2 on t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR
SYSTEM_TIME AS OF t1.proctime t4 ON t1.so_id =t4.ID
上面的sql吞吐率很低,每秒就处理几条数据,而下面两种情况分开跑,吞吐量都能达标,仅时态表关联能到到几千条,仅rownumber能达到几万条,但不知道为什么他们俩联合后就只有几条了

SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime from installmentdb_t_line_item)tmp ) t1 left
join SNAP_T_OPEN_PAY_ORDER FOR SYSTEM_TIME AS OF t1.proctime t2 on
t2.LI_ID= t1.id left join SNAP_T_SALES_ORDER FOR SYSTEM_TIME AS OF
t1.proctime t4 ON t1.so_id =t4.ID

SELECT distinct t1.id as order_id,...,DATE_FORMAT(t1.proctime,'-MM-dd
HH:mm:ss') as etl_time FROM (select id,...,proctime from (select
data.index0.id,...,proctime,ROW_NUMBER() OVER (PARTITION BY data.index0.id
ORDER BY es desc) AS rowNum from installmentdb_t_line_item)tmp where
rowNum<=1) t1


Re: Blink模式下运用collect方法快速获取结果

2020-04-24 文章 jun su
非常感谢, 我用的flink-1.9.2 , 但是直接将代码copy过来可以用了!

Jingsong Li  于2020年4月24日周五 下午3:02写道:

> 1.10里面有TableUtils了,里面有collectToList
>
>
> Best,
> Jingsong Lee
>
> On Fri, Apr 24, 2020 at 2:49 PM jun su  wrote:
>
> > hi all,
> >
> > 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
> > 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
> >
> > def collect[T](
> > tEnv: TableEnvironment,
> > table: Table,
> > sink: CollectTableSink[T],
> > jobName: Option[String]): Seq[T] = {
> >   val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
> > .asInstanceOf[TypeInformation[T]]
> > .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
> >   .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
> >   val id = new AbstractID().toString
> >   sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
> >   val sinkName = UUID.randomUUID().toString
> >   tEnv.registerTableSink(sinkName, sink)
> >   tEnv.insertInto(table, sinkName)
> >
> >   val res = tEnv.execute("test")
> >   val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
> >   SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> > }
> >
> >
> > jun su  于2020年4月24日周五 下午2:05写道:
> >
> > > hi all,
> > >
> > > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> > > 结果用于代码调试么?
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best,
Jun Su


Re: Blink模式下运用collect方法快速获取结果

2020-04-24 文章 Jingsong Li
1.10里面有TableUtils了,里面有collectToList


Best,
Jingsong Lee

On Fri, Apr 24, 2020 at 2:49 PM jun su  wrote:

> hi all,
>
> 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
> 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
>
> def collect[T](
> tEnv: TableEnvironment,
> table: Table,
> sink: CollectTableSink[T],
> jobName: Option[String]): Seq[T] = {
>   val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
> .asInstanceOf[TypeInformation[T]]
> .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
>   .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
>   val id = new AbstractID().toString
>   sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
>   val sinkName = UUID.randomUUID().toString
>   tEnv.registerTableSink(sinkName, sink)
>   tEnv.insertInto(table, sinkName)
>
>   val res = tEnv.execute("test")
>   val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
>   SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> }
>
>
> jun su  于2020年4月24日周五 下午2:05写道:
>
> > hi all,
> >
> > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> > 结果用于代码调试么?
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best, Jingsong Lee


Re: Blink模式下运用collect方法快速获取结果

2020-04-24 文章 jun su
hi all,

找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:

def collect[T](
tEnv: TableEnvironment,
table: Table,
sink: CollectTableSink[T],
jobName: Option[String]): Seq[T] = {
  val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
.createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
  .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
  val id = new AbstractID().toString
  sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
  val sinkName = UUID.randomUUID().toString
  tEnv.registerTableSink(sinkName, sink)
  tEnv.insertInto(table, sinkName)

  val res = tEnv.execute("test")
  val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
  SerializedListAccumulator.deserializeList(accResult, typeSerializer)
}


jun su  于2020年4月24日周五 下午2:05写道:

> hi all,
>
> blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> 结果用于代码调试么?
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su


Blink模式下运用collect方法快速获取结果

2020-04-24 文章 jun su
hi all,

blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
结果用于代码调试么?

-- 
Best,
Jun Su


Re: 【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-04-03 文章 Kurt Young
你好,这个是预期中的。在新的类型系统下,我们将使用 LocalDateTime 作为 TIMESTAMP 类型的默认对象。
同时我们还禁用了 long 和 TIMESTAMP 对象的直接互转。
具体的原因和细节可以看:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System

Best,
Kurt


On Fri, Apr 3, 2020 at 4:58 PM 1193216154 <1193216...@qq.com> wrote:

>
> 你好,最近改成blinkplanner发现了两个问题。及时两者生成的proctime的时间类型不同,一个是TimeStamp,一个是LocalDateTime。
>
>
> org.apache.flink.table.dataformat.DataFormatConverters中TimestampConverter 的
>
> toInternalImpl方法只支持TimeStamp的参数,而我遇见的情况是传进了long类型,导致类转换异常,如果能重载toInternalImpl方法加一个long,或许可以解决我的问题
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年4月1日(星期三) 上午9:22
> 收件人:"user-zh"
> 主题:【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
>
>
>
> 大家好,
>
> 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
> 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
> 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
> 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
> 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。
>
> 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
> 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
> 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
> 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
> 您的反馈之后,我们有足够的时间进行修复和完善。
>
> 希望听到您宝贵的声音和意见,谢谢。
>
> Best,
> Kurt


???????????????????? 1.11 ???????? blink planner ?????????? planner

2020-04-03 文章 1193216154
??blinkplanner??proctime??TimeStampLocalDateTime??


org.apache.flink.table.dataformat.DataFormatConverters??TimestampConverter ??
toInternalImpl??TimeStamp??longtoInternalImpl??long??
----
??:"Kurt Young"

【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-03-31 文章 Kurt Young
大家好,

正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。

因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
您的反馈之后,我们有足够的时间进行修复和完善。

希望听到您宝贵的声音和意见,谢谢。

Best,
Kurt


Re: 使用blink planner读取mysql数据

2020-03-23 文章 Jark Wu
Hi,

DDL 是定义了元数据,首先你需要先在 Flink SQL 中用 DDL 定义你在 mysql 中的 student 表。比如
CREATE TABLE student (
  id BIGINT,
  score INT
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
  'connector.table' = 'student',
  ...
)

然后,如果你想要查询数据,可以通过  Flink SQL query 来查询,如:
SELECT * from student WHERE score > 60

注:以上命令都可以在 Flink SQL CLI 中运行 [1]。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html


On Mon, 23 Mar 2020 at 16:30, 烟虫李彦卓 <1229766...@qq.com> wrote:

> hi,All请问,在blink planner的batch mode下,读取mysql数据,依照官网的JDBC Connector的操作:
> CREATE TABLE MyUserTable (   ... ) WITH (   'connector.type' = 'jdbc', --
> required: specify this table type is jdbc  'connector.url' =
> 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
> 'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
> 'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name
> of the JDBC driver to use to connect to this URL.
> -- If not set, it will automatically be derived
> from the URL.'connector.username' = 'name', -- optional: jdbc user name
> and password   'connector.password' = 'password',
> 'connector.write.flush.max-rows' = '5000'
>
>
> 我理解的是创建一个名字叫“ MyUserTable”的临时表,这个表是属于flink env的。但是从MySQL读取的sql(比如说“select
> * from student where score 
> 60")在哪儿能体现呢?看上面的用法好像是把全部的数据都取出来或者取多少条?这是让我困惑的地方,另外blink planner 的batch
> mode读取JDBC有别的方法吗?Thanks!


????blink planner????mysql????

2020-03-23 文章 ??????????
hi??Allblink planner??batch modemysqlJDBC 
Connector
CREATE TABLE MyUserTable (   ... ) WITH (   'connector.type' = 'jdbc', -- 
required: specify this table type is jdbc  'connector.url' = 
'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url  
'connector.table' = 'jdbc_table_name',  -- required: jdbc table name  
'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of 
the JDBC driver to use to connect to this URL.  
-- If not set, it will automatically be derived from the URL.   
 'connector.username' = 'name', -- optional: jdbc user name and password   
'connector.password' = 'password',
'connector.write.flush.max-rows' = '5000'


?? MyUserTableflink 
env??MySQL??sql??select * from student where score  
60"??blink
 planner ??batch modeJDBC??Thanks!

Re: ParquetTableSource在blink table planner下的使用问题

2020-02-17 文章 jun su
hi Jark,

就是因为我的数据里 event_name 字段的value 没有 "没有这个值" , 所以才比较奇怪

Jark Wu  于2020年2月18日周二 下午12:15写道:

> Hi jun,
>
> 这个是符合预期的行为哈。这说明你的 source 中有4条 event_name 的值是  '没有这个值'
>
> Best,
> Jark
>
> On Mon, 17 Feb 2020 at 23:26, jun su  wrote:
>
>> hi Jark Wu,
>>
>> 感谢你的帮助 , 我在之前的问询中还发现了一些别的问题:
>>
>> 发现ParquetTableSource在flink table planner下, stream/batch 两个模式下都有这个情况:
>> 当select一个字段, 并且where条件有 = 判断的话, 输出结果是将where条件
>> 直接赋值给了select字段并返回,以下是简单描述:
>>
>> sql = select event_name from source where event_name = '没有这个值'
>>
>> 输出结果为:
>>
>> 没有这个值
>> 没有这个值
>> 没有这个值
>> 没有这个值
>>
>>
>> Jark Wu  于2020年2月17日周一 下午5:03写道:
>>
>>> 排查了下,确实是个 bug,我开了个 issue 来跟进解决:
>>> https://issues.apache.org/jira/browse/FLINK-16113
>>>
>>> 当前的 workaround 可以将常量放到 selelct 中,比如 select a,b,'windows进程创建' from
>>> MyTable where c = 'windows进程创建'
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 17 Feb 2020 at 15:15, jun su  wrote:
>>>
>>>> 上一个问题补充, 在blink table planner下:
>>>>
>>>> select event_name from table => 中文编码没问题
>>>>
>>>> select event_name from table where event_name = 'windows进程创建'=>
>>>> 此时中文编码有问题, 应该是在底层userFunction里将where条件为true时, 直接将where条件赋值给了
>>>> 输出列,此时出现了编码问题
>>>>
>>>> 麻烦查证下
>>>>
>>>> jun su  于2020年2月17日周一 下午1:28写道:
>>>>
>>>>> hi Jark Wu,
>>>>>
>>>>> 又发现了一个blink table planner的问题,中文显示乱码,麻烦也查证下, 以下是代码:
>>>>>
>>>>> ParquetTableSource parquetTableSource = ParquetTableSource
>>>>> .builder()
>>>>> .path("/Users/sujun/Downloads/edr/EDR")
>>>>> .forParquetSchema(new 
>>>>> AvroSchemaConverter().convert(org.apache.avro.Schema.parse(schema, true)))
>>>>> .build();
>>>>>
>>>>> Table source = bsTableEnv.fromTableSource(parquetTableSource);
>>>>> bsTableEnv.createTemporaryView("source",source);
>>>>>
>>>>> Table t1 = bsTableEnv.sqlQuery("select event_name from source where 
>>>>> event_name = 'windows进程创建'");
>>>>> bsTableEnv.toAppendStream(t1,Row.class).print();
>>>>>
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>>
>>>>>
>>>>> jun su  于2020年2月14日周五 下午6:54写道:
>>>>>
>>>>>> 1. 发现ParquetTableSource在flink table planner下, stream/batch
>>>>>> 两个模式下都有以上提出的问题,
>>>>>> 2. blink table planner下没有以上问题, 但是中文print方法有编码问题
>>>>>>
>>>>>> 不清数是不是我使用问题,麻烦查证下
>>>>>>
>>>>>> jun su  于2020年2月14日周五 下午6:30写道:
>>>>>>
>>>>>>> hi Jark Wu,
>>>>>>>
>>>>>>> 抱歉以下是我的代码和结果:
>>>>>>>
>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>> ExecutionEnvironment fbEnv = 
>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>> BatchTableEnvironment fbTableEnv = 
>>>>>>> BatchTableEnvironment.create(fbEnv);
>>>>>>>
>>>>>>> String schema = 
>>>>>>> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"parent_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"dst_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_pid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"windows_event_id\",\"type\":[

Re: ParquetTableSource在blink table planner下的使用问题

2020-02-17 文章 Jark Wu
Hi jun,

这个是符合预期的行为哈。这说明你的 source 中有4条 event_name 的值是  '没有这个值'

Best,
Jark

On Mon, 17 Feb 2020 at 23:26, jun su  wrote:

> hi Jark Wu,
>
> 感谢你的帮助 , 我在之前的问询中还发现了一些别的问题:
>
> 发现ParquetTableSource在flink table planner下, stream/batch 两个模式下都有这个情况:
> 当select一个字段, 并且where条件有 = 判断的话, 输出结果是将where条件
> 直接赋值给了select字段并返回,以下是简单描述:
>
> sql = select event_name from source where event_name = '没有这个值'
>
> 输出结果为:
>
> 没有这个值
> 没有这个值
> 没有这个值
> 没有这个值
>
>
> Jark Wu  于2020年2月17日周一 下午5:03写道:
>
>> 排查了下,确实是个 bug,我开了个 issue 来跟进解决:
>> https://issues.apache.org/jira/browse/FLINK-16113
>>
>> 当前的 workaround 可以将常量放到 selelct 中,比如 select a,b,'windows进程创建' from MyTable
>> where c = 'windows进程创建'
>>
>> Best,
>> Jark
>>
>> On Mon, 17 Feb 2020 at 15:15, jun su  wrote:
>>
>>> 上一个问题补充, 在blink table planner下:
>>>
>>> select event_name from table => 中文编码没问题
>>>
>>> select event_name from table where event_name = 'windows进程创建'=>
>>> 此时中文编码有问题, 应该是在底层userFunction里将where条件为true时, 直接将where条件赋值给了
>>> 输出列,此时出现了编码问题
>>>
>>> 麻烦查证下
>>>
>>> jun su  于2020年2月17日周一 下午1:28写道:
>>>
>>>> hi Jark Wu,
>>>>
>>>> 又发现了一个blink table planner的问题,中文显示乱码,麻烦也查证下, 以下是代码:
>>>>
>>>> ParquetTableSource parquetTableSource = ParquetTableSource
>>>> .builder()
>>>> .path("/Users/sujun/Downloads/edr/EDR")
>>>> .forParquetSchema(new 
>>>> AvroSchemaConverter().convert(org.apache.avro.Schema.parse(schema, true)))
>>>> .build();
>>>>
>>>> Table source = bsTableEnv.fromTableSource(parquetTableSource);
>>>> bsTableEnv.createTemporaryView("source",source);
>>>>
>>>> Table t1 = bsTableEnv.sqlQuery("select event_name from source where 
>>>> event_name = 'windows进程创建'");
>>>> bsTableEnv.toAppendStream(t1,Row.class).print();
>>>>
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>> windows\u8FDB\u7A0B\u521B\u5EFA
>>>>
>>>>
>>>> jun su  于2020年2月14日周五 下午6:54写道:
>>>>
>>>>> 1. 发现ParquetTableSource在flink table planner下, stream/batch
>>>>> 两个模式下都有以上提出的问题,
>>>>> 2. blink table planner下没有以上问题, 但是中文print方法有编码问题
>>>>>
>>>>> 不清数是不是我使用问题,麻烦查证下
>>>>>
>>>>> jun su  于2020年2月14日周五 下午6:30写道:
>>>>>
>>>>>> hi Jark Wu,
>>>>>>
>>>>>> 抱歉以下是我的代码和结果:
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>> ExecutionEnvironment fbEnv = 
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>> BatchTableEnvironment fbTableEnv = 
>>>>>> BatchTableEnvironment.create(fbEnv);
>>>>>>
>>>>>> String schema = 
>>>>>> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"parent_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"dst_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_pid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"windows_event_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dev_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_da\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tran_protocol\"

Re: ParquetTableSource在blink table planner下的使用问题

2020-02-17 文章 Jark Wu
排查了下,确实是个 bug,我开了个 issue 来跟进解决:
https://issues.apache.org/jira/browse/FLINK-16113

当前的 workaround 可以将常量放到 selelct 中,比如 select a,b,'windows进程创建' from MyTable
where c = 'windows进程创建'

Best,
Jark

On Mon, 17 Feb 2020 at 15:15, jun su  wrote:

> 上一个问题补充, 在blink table planner下:
>
> select event_name from table => 中文编码没问题
>
> select event_name from table where event_name = 'windows进程创建'=>
> 此时中文编码有问题, 应该是在底层userFunction里将where条件为true时, 直接将where条件赋值给了
> 输出列,此时出现了编码问题
>
> 麻烦查证下
>
> jun su  于2020年2月17日周一 下午1:28写道:
>
>> hi Jark Wu,
>>
>> 又发现了一个blink table planner的问题,中文显示乱码,麻烦也查证下, 以下是代码:
>>
>> ParquetTableSource parquetTableSource = ParquetTableSource
>> .builder()
>> .path("/Users/sujun/Downloads/edr/EDR")
>> .forParquetSchema(new 
>> AvroSchemaConverter().convert(org.apache.avro.Schema.parse(schema, true)))
>> .build();
>>
>> Table source = bsTableEnv.fromTableSource(parquetTableSource);
>> bsTableEnv.createTemporaryView("source",source);
>>
>> Table t1 = bsTableEnv.sqlQuery("select event_name from source where 
>> event_name = 'windows进程创建'");
>> bsTableEnv.toAppendStream(t1,Row.class).print();
>>
>> windows\u8FDB\u7A0B\u521B\u5EFA
>> windows\u8FDB\u7A0B\u521B\u5EFA
>> windows\u8FDB\u7A0B\u521B\u5EFA
>> windows\u8FDB\u7A0B\u521B\u5EFA
>> windows\u8FDB\u7A0B\u521B\u5EFA
>> windows\u8FDB\u7A0B\u521B\u5EFA
>>
>>
>> jun su  于2020年2月14日周五 下午6:54写道:
>>
>>> 1. 发现ParquetTableSource在flink table planner下, stream/batch
>>> 两个模式下都有以上提出的问题,
>>> 2. blink table planner下没有以上问题, 但是中文print方法有编码问题
>>>
>>> 不清数是不是我使用问题,麻烦查证下
>>>
>>> jun su  于2020年2月14日周五 下午6:30写道:
>>>
>>>> hi Jark Wu,
>>>>
>>>> 抱歉以下是我的代码和结果:
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>> ExecutionEnvironment fbEnv = 
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
>>>>
>>>> String schema = 
>>>> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"parent_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"dst_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_pid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"windows_event_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dev_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_da\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tran_protocol\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"src_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"domain_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"operation_object\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"protocol\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"src_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_user\",\"type\":[\"null\",\"string\&q

Re:Re: 使用Flink 1.10 blink planner写ES的异常问题

2020-02-15 文章 sunfulin
好的,感谢。我在user里附加了query SQL。











在 2020-02-15 16:14:56,"Jark Wu"  写道:
>Hi sunfulin,
>
>这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query
>推断不出 pk,可能要结合 query 看一下。
>我看到你在 user@ 里面也发邮件了,我已经在那下面回复了,我们要不在 user@ 邮件下面继续讨论吧。可以将你们的 SQL 补充一下,包括
>DDL。
>
>Best,
>Jark
>
>On Fri, 14 Feb 2020 at 23:03, sunfulin  wrote:
>
>> Hi,
>> 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into
>> table select xxx group by x),抛出了如下异常:
>> 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique
>> key目前不支持。。那这就是个悖论啊。。真的不科学。
>>
>>
>> 关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。
>>
>>
>> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
>> that Table has a full primary keys if it is updated.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)
>>
>>


Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 文章 jun su
1. 发现ParquetTableSource在flink table planner下, stream/batch 两个模式下都有以上提出的问题,
2. blink table planner下没有以上问题, 但是中文print方法有编码问题

不清数是不是我使用问题,麻烦查证下

jun su  于2020年2月14日周五 下午6:30写道:

> hi Jark Wu,
>
> 抱歉以下是我的代码和结果:
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment fbEnv = 
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
>
> String schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"parent_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"dst_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"technique_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tgt_pid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"windows_event_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dev_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_da\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tran_protocol\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"src_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"domain_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"operation_object\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"protocol\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"src_address\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_user\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"image\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_command_line\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"product\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sa_sp_ap_da_dp\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rule_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"receive_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"collector_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"source_process_id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"src_ad\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rule_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"src_port\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\&qu

Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 文章 Jark Wu
Hi Jun,

你上传的图片失败了,你可以选择用一些图床工具上传然后将链接贴在这里。或者直接贴文本。

Best,
Jark

On Fri, 14 Feb 2020 at 18:16, jun su  wrote:

> hi JingsongLee,
> 我在测试ParquetTableSource时遇到一个问题:  我的数据中没有where条件设置的值, 但是打印的结果,
> 是将where条件直接赋值给了该字段
>
> [image: image.png]
>
> JingsongLee  于2020年2月14日周五 下午5:05写道:
>
>> Hi jun,
>>
>> pushdown逻辑是批流复用的,应该work的很愉快。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> --
>> From:jun su 
>> Send Time:2020年2月14日(星期五) 17:00
>> To:user-zh 
>> Subject:ParquetTableSource在blink table planner下的使用问题
>>
>> 你好:
>>官网文档中说明Blink Table Planner并不支持BatchTableSource,
>>
>> 目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
>> table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
>> pushdown逻辑会不会出现bug?
>>
>


Re: ParquetTableSource在blink table planner下的使用问题

2020-02-14 文章 jun su
hi JingsongLee,
我在测试ParquetTableSource时遇到一个问题:  我的数据中没有where条件设置的值, 但是打印的结果,
是将where条件直接赋值给了该字段

[image: image.png]

JingsongLee  于2020年2月14日周五 下午5:05写道:

> Hi jun,
>
> pushdown逻辑是批流复用的,应该work的很愉快。
>
> Best,
> Jingsong Lee
>
>
> --
> From:jun su 
> Send Time:2020年2月14日(星期五) 17:00
> To:user-zh 
> Subject:ParquetTableSource在blink table planner下的使用问题
>
> 你好:
>官网文档中说明Blink Table Planner并不支持BatchTableSource,
>
> 目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
> table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
> pushdown逻辑会不会出现bug?
>


ParquetTableSource在blink table planner下的使用问题

2020-02-14 文章 jun su
你好:
   官网文档中说明Blink Table Planner并不支持BatchTableSource,
目前最新版1.10代码中的ParquetTableSource还是BatchTableSource,那么说明目前的ParquetTableSource还不支持blink
table planner ?如果将现有的ParquetTableSource改成StreamTableSource后,
pushdown逻辑会不会出现bug?


Re: blink(基于flink1.5.1版本)可以使用两个hadoop集群吗?

2020-01-26 文章 Yun Tang
Hi Yong

首先,这封邮件就不要抄送开发者邮件列表了,中文的邮件只需要发中文邮件列表。

Flink当然可以用两个YARN集群,关键在于Flink提交作业到YARN的时候,读取的HADDOP配置是什么,其实官方文档[1] 有相关的介绍,主要是 
YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH 
这些环境变量的配置是什么,在你提交的终端内配置一个你搭建的集群环境变量即可。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#background--internals

祝好
唐云

From: Yong 
Sent: Wednesday, January 22, 2020 14:53
To: dev ; user-zh 
Subject: blink(基于flink1.5.1版本)可以使用两个hadoop集群吗?

大家好,
  flink可以使用两个hadoop集群吗?
背景如下:
  目前我这边基于blink搭建了flink standalone集群,状态存储使用公司的hadoop hdfs 
并且使用了kerberos认证,目前是可以正常运行,这个模式需要事先预留一部分资源(TM)保证job故障转移。考虑后续集群扩大,为了节约资源,想使用Hadoop
 YARN来管理flink资源,但是公司的YARN不对外开放,所以自己搭建了一个小的Hadoop 
YARN来管理flink资源,测试时候发现还是走了公司的hadoop集群了,该如何修改配置使用自己的YARN呢?
报错如下:
[qateadmin@UAT14475 bin]$ ./yarn-session.sh
忽略kerberos日志信息
2020-01-22 14:47:36,993 INFO 
org.apache.hadoop.security.UserGroupInformation
   - Login successful for user 
htlapi...@dc.sh.ctripcorp.com using keytab file /opt/data/blink/htlapidev.keytab
2020-01-22 14:47:36,994 INFO 
org.apache.flink.runtime.security.modules.HadoopModule   
 - Hadoop user set to htlapi...@dc.sh.ctripcorp.com (auth:KERBEROS)
2020-01-22 14:47:37,296 INFO 
org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   - No path for the flink jar passed. Using the 
location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-22 14:47:37,450 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm2
2020-01-22 14:47:37,491 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm2 after 1 fail over attempts. 
Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Server 
asks us to fall back to SIMPLE auth, but this client is configured to only 
allow secure connections.; Host Details : local host is: "UAT14475/10.5.119.0"; 
destination host is: "uat14476":8032;
at 
org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1475)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1408)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy16.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:262)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy17.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:488)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:318)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:539)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:448)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:659)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$7(FlinkYarnSessionCli.java:887)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:887)
Caused by: java.io.IOException: Server asks us to fall back to SIMPLE auth, but 
this client is configured to only allow secure connections.
at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:755)
at 
org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1524)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1447)
... 22 more
2020-01-22 14:47:37,495 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm1
2020-01-22 14:47:37,498 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm1 after 2 fail over attempts. 
Trying to fail over after 

blink(????flink1.5.1????)????????????hadoop????????

2020-01-21 文章 Yong

  flinkhadoop
??
  ??blink??flink 
standalonehadoop hdfs 
??kerberos??TM??jobHadoop
 YARN??flinkYARN??Hadoop 
YARN??flinkhadoopYARN
??
[qateadmin@UAT14475 bin]$ ./yarn-session.sh
kerberos
2020-01-22 14:47:36,993 INFO 
org.apache.hadoop.security.UserGroupInformation
   - Login successful for user 
htlapi...@dc.sh.ctripcorp.com using keytab file /opt/data/blink/htlapidev.keytab
2020-01-22 14:47:36,994 INFO 
org.apache.flink.runtime.security.modules.HadoopModule   
 - Hadoop user set to htlapi...@dc.sh.ctripcorp.com (auth:KERBEROS)
2020-01-22 14:47:37,296 INFO 
org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   - No path for the flink jar passed. Using the 
location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-22 14:47:37,450 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm2
2020-01-22 14:47:37,491 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm2 after 1 fail over attempts. 
Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Server 
asks us to fall back to SIMPLE auth, but this client is configured to only 
allow secure connections.; Host Details : local host is: "UAT14475/10.5.119.0"; 
destination host is: "uat14476":8032;
at 
org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1475)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1408)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy16.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:262)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy17.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:488)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:318)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:539)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:448)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:659)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$7(FlinkYarnSessionCli.java:887)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:887)
Caused by: java.io.IOException: Server asks us to fall back to SIMPLE auth, but 
this client is configured to only allow secure connections.
at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:755)
at 
org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1524)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1447)
... 22 more
2020-01-22 14:47:37,495 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm1
2020-01-22 14:47:37,498 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm1 after 2 fail over attempts. 
Trying to fail over after sleeping for 36261ms.
java.net.ConnectException: Call From UAT14475/10.5.119.0 to 
uat14475.novalocal:8032 failed on connection exception: 
java.net.ConnectException: Connection refused; For more details see: 
http://wiki.apache.org/hadoop/ConnectionRefused
at 
sun.reflect.NativeConstructorAccessorImpl.newIns

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-19 文章 Kevin Liao
改用最新 master 代码编译(打包后版本 1.11-SNAPSHOT)

将这段

.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)


改成使用 DataTypes 后可以跑通


Kevin Liao  于2020年1月14日周二 上午11:52写道:

> 我用的是
> https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
> 官网下载的
>
> 您说的 master 最新的版本我稍后试一下,谢谢
>
> JingsongLee  于2020年1月14日周二 上午11:51写道:
>
>> 谢谢,
>> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月14日(星期二) 11:38
>> To:user-zh ; JingsongLee <
>> lzljs3620...@aliyun.com>
>> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> flink 版本是 1.9.1 release
>>
>> Doc
>> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
>> 30 多个字段,我理解这跟字段数关系不大
>>
>> ```
>>
>> import org.apache.commons.lang3.builder.ToStringBuilder;
>> import 
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>>
>> /**
>>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>>  */
>> @JsonIgnoreProperties(ignoreUnknown = true)
>> public class Doc {
>>
>>   private String suv;
>>   private Float factor = 1F;
>>   private String st;
>>   private String agentId;
>>   private Long timestamp;
>>
>>   ... // omit some, omit getters and setters
>>
>> ```
>>
>> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>>
>> JingsongLee  于2020年1月14日周二 上午11:25写道:
>> Hi Kevin,
>>
>> 这是什么版本?
>> Doc类能完整提供下吗?方便我们复现。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月13日(星期一) 17:37
>> To:user-zh 
>> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> tEnv.connect(new Kafka()
>> .version("universal")
>> .topic("xxx")
>> .startFromLatest()
>> .property("bootstrap.servers",
>> "")
>> .property("group.id", ""))
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema()
>> //.field("logger_name", Types.STRING)
>> //.field("host", Types.STRING)
>> //.field("@timestamp", Types.SQL_TIMESTAMP)
>> //.field("_rowtime", Types.SQL_TIMESTAMP)
>> //.rowtime(
>> //new
>>
>> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>> .field("doc", Types.POJO(Doc.class))
>> )
>> .inAppendMode()
>> .registerTableSource("xxx");
>>
>> Table result = tEnv.sqlQuery(
>> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>>
>> //result.printSchema();
>> tEnv.toAppendStream(result,
>> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, LONG, STRING, INT, STRING, INT)).print();
>>
>>
>>
>> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>>
>>
>> 、、、
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> LEGACY(PojoType) of
>> table field 'doc' does not match with type
>> PojoType of the field
>> 'doc' of the TableSource return type.
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>&g

求助帖:flink tpc-ds中加入blink的runtime filter问题

2020-01-16 文章 zhaoyunpython . d . 1


Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
我用的是
https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
官网下载的

您说的 master 最新的版本我稍后试一下,谢谢

JingsongLee  于2020年1月14日周二 上午11:51写道:

> 谢谢,
> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>
> Best,
> Jingsong Lee
>
> --
> From:Kevin Liao 
> Send Time:2020年1月14日(星期二) 11:38
> To:user-zh ; JingsongLee <
> lzljs3620...@aliyun.com>
> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>
> flink 版本是 1.9.1 release
>
> Doc
> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
> 30 多个字段,我理解这跟字段数关系不大
>
> ```
>
> import org.apache.commons.lang3.builder.ToStringBuilder;
> import 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>
> /**
>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>  */
> @JsonIgnoreProperties(ignoreUnknown = true)
> public class Doc {
>
>   private String suv;
>   private Float factor = 1F;
>   private String st;
>   private String agentId;
>   private Long timestamp;
>
>   ... // omit some, omit getters and setters
>
> ```
>
> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>
> JingsongLee  于2020年1月14日周二 上午11:25写道:
> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.pl

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月14日(星期二) 11:38
To:user-zh ; JingsongLee 
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 
30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee  于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 --
 From:Kevin Liao 
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh 
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
 .version("universal")
 .topic("xxx")
 .startFromLatest()
 .property("bootstrap.servers",
 "")
 .property("group.id", ""))
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(new Schema()
 //.field("logger_name", Types.STRING)
 //.field("host", Types.STRING)
 //.field("@timestamp", Types.SQL_TIMESTAMP)
 //.field("_rowtime", Types.SQL_TIMESTAMP)
 //.rowtime(
 //new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
 .field("doc", Types.POJO(Doc.class))
 )
 .inAppendMode()
 .registerTableSource("xxx");

 Table result = tEnv.sqlQuery(
 "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //result.printSchema();
 tEnv.toAppendStream(result,
 new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType) of
 table field 'doc' does not match with type
 PojoType of the field
 'doc' of the TableSource return type.
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at 
org.apache.flink.table.planner.plan.nodes.physica

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
flink 版本是 1.9.1 release

Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大

```

import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;

  ... // omit some, omit getters and setters

```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)

JingsongLee  于2020年1月14日周二 上午11:25写道:

> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at
> org.apache.flink.table.planner.pla

blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
tEnv.connect(new Kafka()
.version("universal")
.topic("xxx")
.startFromLatest()
.property("bootstrap.servers",
"")
.property("group.id", ""))
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)
.inAppendMode()
.registerTableSource("xxx");

Table result = tEnv.sqlQuery(
"SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//result.printSchema();
tEnv.toAppendStream(result,
new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType) of
table field 'doc' does not match with type
PojoType of the field
'doc' of the TableSource return type.
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  

Re: 如何限制blink中资源使用上限(perjob模式)

2019-10-20 文章 Xintong Song
你好,

blink perjob模式是根据job的资源需求按需申请资源的,不能限制整个job的资源上限。
你列出来的这几个参数,只能控制单个TM的资源上限,但是单个TM的资源上限减少了,整个job的资源需求并不会变,只是会申请更多的TM。

Thank you~

Xintong Song



On Sat, Oct 19, 2019 at 3:56 PM 蒋涛涛  wrote:

> Hi all,
>
> 我在使用blink提交的任务的时候(perjob模式),如何限制任务的资源使用上限啊,有个任务使用yarn的vcores特别多
> 曾使用:
> 1. taskmanager.multi-slots.max.cpu.core
> 2. taskmanager.cpu.core
> 3. taskmanager.capacity.cpu.core
>  这些参数都是无效
>
> 如何在perjob模式下控制住资源的使用上限?
>
>
> 祝好
>


Re: blink SQL从kafka中获取rowtime

2019-10-17 文章 Jark Wu
Hi Zijie,

应该是你的 sqlTimestamp 字段中有 null 的数据,在去取 ts 的时候报 NPE 了。
目前 watermark assigner 要求每条数据的 ts 都是有值的。

Best,
Jark 

> 在 2019年10月17日,20:25,Zijie Lu  写道:
> 
> CREATE TABLE requests(
> `rowtime` TIMESTAMP,
> `requestId` VARCHAR,
> `algoExtent` ROW(`mAdId` VARCHAR))
> with (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'test_request',
>  'connector.startup-mode' = 'latest-offset',
>  'connector.properties.0.key' = 'zookeeper.connect',
>  'connector.properties.0.value' = '10.107.116.42:2181',
>  'connector.properties.1.key' = 'bootstrap.servers',
>  'connector.properties.1.value' = '10.107.116.42:9092',
>  'connector.properties.2.key' = 'group.id',
>  'connector.properties.2.value' = 'test_request',
>  'update-mode' = 'append','format.type' = 'json',
>  'format.json-schema': '{type: "object", properties: {sqlTimestamp: {
> type: "string"}, requestId: { type: "string"}, "algoExtent": {type:
> "object", "properties": {"mAdId": {type: "string"}'
>  'schema.0.rowtime.timestamps.type' = 'from-field',
>  'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
>  'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
> 尝试过这样的定义也是报同样的错
> 
> On Thu, 17 Oct 2019 at 20:22, Zijie Lu  wrote:
> 
>> 而这个定义在old planner里是可以用的
>> 
>> On Thu, 17 Oct 2019 at 19:49, Zijie Lu  wrote:
>> 
>>> 我使用blink planner来定义了下面的表
>>> CREATE TABLE requests(
>>> `rowtime` TIMESTAMP,
>>> `requestId` VARCHAR,
>>> `algoExtent` ROW(`mAdId` VARCHAR))
>>> with (
>>>  'connector.type' = 'kafka',
>>>  'connector.version' = 'universal',
>>>  'connector.topic' = 'test_request',
>>>  'connector.startup-mode' = 'latest-offset',
>>>  'connector.properties.0.key' = 'zookeeper.connect',
>>>  'connector.properties.0.value' = '10.107.116.42:2181',
>>>  'connector.properties.1.key' = 'bootstrap.servers',
>>>  'connector.properties.1.value' = '10.107.116.42:9092',
>>>  'connector.properties.2.key' = 'group.id',
>>>  'connector.properties.2.value' = 'test_request',
>>>  'update-mode' = 'append','format.type' = 'json',
>>>  'format.derive-schema' = 'true',
>>>  'schema.0.rowtime.timestamps.type' = 'from-field',
>>>  'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
>>>  'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
>>> 然后kafka里消息的格式如下
>>> {"requestId":  "","algoExtent": {"duration": 12,"adType ":
>>> "FEED_568_320","mAdId":  "1910141050233527", "sqlTimestamp":"2019-10-17
>>> 19:08:01" }}
>>> 但是运行时报错
>>> Caused by:
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>>at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>>at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:10

Re: blink SQL从kafka中获取rowtime

2019-10-17 文章 Zijie Lu
CREATE TABLE requests(
`rowtime` TIMESTAMP,
`requestId` VARCHAR,
`algoExtent` ROW(`mAdId` VARCHAR))
with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test_request',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.0.key' = 'zookeeper.connect',
  'connector.properties.0.value' = '10.107.116.42:2181',
  'connector.properties.1.key' = 'bootstrap.servers',
  'connector.properties.1.value' = '10.107.116.42:9092',
  'connector.properties.2.key' = 'group.id',
  'connector.properties.2.value' = 'test_request',
  'update-mode' = 'append','format.type' = 'json',
  'format.json-schema': '{type: "object", properties: {sqlTimestamp: {
type: "string"}, requestId: { type: "string"}, "algoExtent": {type:
"object", "properties": {"mAdId": {type: "string"}'
  'schema.0.rowtime.timestamps.type' = 'from-field',
  'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
  'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
尝试过这样的定义也是报同样的错

On Thu, 17 Oct 2019 at 20:22, Zijie Lu  wrote:

> 而这个定义在old planner里是可以用的
>
> On Thu, 17 Oct 2019 at 19:49, Zijie Lu  wrote:
>
>> 我使用blink planner来定义了下面的表
>> CREATE TABLE requests(
>> `rowtime` TIMESTAMP,
>> `requestId` VARCHAR,
>> `algoExtent` ROW(`mAdId` VARCHAR))
>> with (
>>   'connector.type' = 'kafka',
>>   'connector.version' = 'universal',
>>   'connector.topic' = 'test_request',
>>   'connector.startup-mode' = 'latest-offset',
>>   'connector.properties.0.key' = 'zookeeper.connect',
>>   'connector.properties.0.value' = '10.107.116.42:2181',
>>   'connector.properties.1.key' = 'bootstrap.servers',
>>   'connector.properties.1.value' = '10.107.116.42:9092',
>>   'connector.properties.2.key' = 'group.id',
>>   'connector.properties.2.value' = 'test_request',
>>   'update-mode' = 'append','format.type' = 'json',
>>   'format.derive-schema' = 'true',
>>   'schema.0.rowtime.timestamps.type' = 'from-field',
>>   'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
>>   'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
>> 然后kafka里消息的格式如下
>> {"requestId":  "","algoExtent": {"duration": 12,"adType ":
>> "FEED_568_320","mAdId":  "1910141050233527", "sqlTimestamp":"2019-10-17
>> 19:08:01" }}
>> 但是运行时报错
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput

Re: blink SQL从kafka中获取rowtime

2019-10-17 文章 Zijie Lu
而这个定义在old planner里是可以用的

On Thu, 17 Oct 2019 at 19:49, Zijie Lu  wrote:

> 我使用blink planner来定义了下面的表
> CREATE TABLE requests(
> `rowtime` TIMESTAMP,
> `requestId` VARCHAR,
> `algoExtent` ROW(`mAdId` VARCHAR))
> with (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test_request',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.0.key' = 'zookeeper.connect',
>   'connector.properties.0.value' = '10.107.116.42:2181',
>   'connector.properties.1.key' = 'bootstrap.servers',
>   'connector.properties.1.value' = '10.107.116.42:9092',
>   'connector.properties.2.key' = 'group.id',
>   'connector.properties.2.value' = 'test_request',
>   'update-mode' = 'append','format.type' = 'json',
>   'format.derive-schema' = 'true',
>   'schema.0.rowtime.timestamps.type' = 'from-field',
>   'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
>   'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
> 然后kafka里消息的格式如下
> {"requestId":  "","algoExtent": {"duration": 12,"adType ":
> "FEED_568_320","mAdId":  "1910141050233527", "sqlTimestamp":"2019-10-17
> 19:08:01" }}
> 但是运行时报错
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 13 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.dataformat.GenericRow.getLong(GenericRow.java:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecTableSourceScan.scala:202)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecTableSourceScan.scala:194)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:64)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 19 more
> 请问在blink里应该如何定义rowtime呢?
>


blink SQL从kafka中获取rowtime

2019-10-17 文章 Zijie Lu
我使用blink planner来定义了下面的表
CREATE TABLE requests(
`rowtime` TIMESTAMP,
`requestId` VARCHAR,
`algoExtent` ROW(`mAdId` VARCHAR))
with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test_request',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.0.key' = 'zookeeper.connect',
  'connector.properties.0.value' = '10.107.116.42:2181',
  'connector.properties.1.key' = 'bootstrap.servers',
  'connector.properties.1.value' = '10.107.116.42:9092',
  'connector.properties.2.key' = 'group.id',
  'connector.properties.2.value' = 'test_request',
  'update-mode' = 'append','format.type' = 'json',
  'format.derive-schema' = 'true',
  'schema.0.rowtime.timestamps.type' = 'from-field',
  'schema.0.rowtime.timestamps.from' = 'sqlTimestamp',
  'schema.0.rowtime.watermarks.type' = 'periodic-ascending')
然后kafka里消息的格式如下
{"requestId":  "","algoExtent": {"duration": 12,"adType ":
"FEED_568_320","mAdId":  "1910141050233527", "sqlTimestamp":"2019-10-17
19:08:01" }}
但是运行时报错
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at SourceConversion$4.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 13 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.table.dataformat.GenericRow.getLong(GenericRow.java:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecTableSourceScan.scala:202)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecTableSourceScan.scala:194)
at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:64)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 19 more
请问在blink里应该如何定义rowtime呢?


Re:Re: Flink 1.9 Blink planner 时间字段问题

2019-09-06 文章 hb
不行, 
Caused by: org.apache.flink.table.api.ValidationException: Rowtime attribute 
'_rowtime' is not of type SQL_TIMESTAMP.



在 2019-09-06 10:48:02,"Jark Wu"  写道:
>可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
>.field("_rowtime", Types.LONG())
>
>> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
>> 
>> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.
>


Re: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 Jark Wu
可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
.field("_rowtime", Types.LONG())

> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
> 
> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.



Re:回复: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.




在 2019-09-05 14:06:08,"pengcheng...@bonc.com.cn"  写道:
>FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型
>
> 
>发件人: hb
>发送时间: 2019-09-05 14:24
>收件人: user-zh
>主题: Flink 1.9 Blink planner 时间字段问题
>代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
>schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000))
>kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,
> 
>输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
>eventTime 字段怎么不支持数值输入呢.
> 
> 
>错误提示:
>```
>Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize 
>JSON object.
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.io.IOException: Failed to deserialize JSON object.
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
>at 
>org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>at 
>org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
>parsed at index 0
>at 
>java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
>... 7 more
>```
> 
> 
> 
> 
>源码:
>```
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val conf = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv = StreamTableEnvironment.create(env, conf)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
>  val kafkaIn = new Kafka()
>.version("0.11")
>.topic("hbtest111")
>.property("bootstrap.servers", "192.168.1.160:19092")
>.property("group.id", "test2")
> 
> 
>  val json = new Json().deriveSchema()
> 
> 
>  val schema = new Schema()
>.field("id", Types.INT())
>.field("name", Types.STRING())
> 
> 
>  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
>  schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>  new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000)
>)
> 
> 
>  
> tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
>  val t = tEnv.sqlQuery("select * from table_from_kafka")
>  t.printSchema()
> 
> 
>  t.toRetractStream[Row].print()
>  tEnv.execute("")
>```


Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,

输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.


错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```




源码:
```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")


  val json = new Json().deriveSchema()


  val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)


  
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()


  t.toRetractStream[Row].print()
  tEnv.execute("")
```

Re: flink1.9 Blink planner create view 问题

2019-08-27 文章 Jark Wu
1.9 还不支持 create view 语法。如果要注册一个 view,可以通过下面的办法:

Table table = tEnv.sqlQuery(“select * from T”)
tEnv.registerTable(“v1”, table);

然后你就可以在之后的sql 中直接查询 v1了

Best,
Jark


> 在 2019年8月28日,11:39,hb <343122...@163.com> 写道:
> 
> 



flink1.9 Blink planner create view 问题

2019-08-27 文章 hb
注册了T表后,创建view报错
 tEnv.sqlUpdate(s"create view v1 as select * from T")
Exception in thread "main" org.apache.flink.table.api.TableException: 
Unsupported node type SqlCreateView


是用错方法了,还是不支持



Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 徐骁
珞感谢

Jark Wu  于2019年8月27日周二 下午6:49写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> >
>
> > 在 2019年8月27日,17:59,徐骁  写道:
> >
> > 这部分有文档吗,看了好几圈没看到
> >
> > hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:
> >
> >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
> >>
> >> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
> >>> kafka版本是 kafka_2.11-1.1.0,
> >>> 支持的kafka版本有哪些
> >>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
> >> pengcheng...@bonc.com.cn> 写道:
> >>>> 检查一下代码的kafka版本,可能是这方面的错误
> >>>>
> >>>>
> >>>>
> >>>> pengcheng...@bonc.com.cn
> >>>>
> >>>> 发件人: hb
> >>>> 发送时间: 2019-08-26 15:14
> >>>> 收件人: user-zh
> >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>>> 之前少了 flink-connector-kafka_2.11 依赖,
> >>>> 现在错误变成  Caused by: java.lang.NoSuchMethodError:
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>>> 了
> >>>>
> >>>>
> >>>> pom依赖:
> >>>> ```
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-core
> >>>>   ${flink.version}
> >>>>
> >>>>
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-clients_2.11
> >>>>   ${flink.version}
> >>>>
> >>>>
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-scala_2.11
> >>>>   ${flink.version}
> >>>>
> >>>>
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-streaming-scala_2.11
> >>>>   ${flink.version}
> >>>>
> >>>>
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-table
> >>>>   1.9.0
> >>>>   pom
> >>>>   provided
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-table-common
> >>>>   ${flink.version}
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-cep-scala_2.11
> >>>>   ${flink.version}
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-connector-filesystem_2.11
> >>>>   ${flink.version}
> >>>>   
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-table-api-scala-bridge_2.11
> >>>>   ${flink.version}
> >>>> 
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-table-api-java-bridge_2.11
> >>>>   ${flink.version}
> >>>>   
> >>>>   
> >>>>
> >>>>
> >>>>   
> >>>>   org.apache.flink
> >>>>   flink-table-planner_2.11
> >>>>   ${flink.version}
> >>>&

Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 Jark Wu
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector>

> 在 2019年8月27日,17:59,徐骁  写道:
> 
> 这部分有文档吗,看了好几圈没看到
> 
> hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:
> 
>> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
>> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>> 
>> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
>>> kafka版本是 kafka_2.11-1.1.0,
>>> 支持的kafka版本有哪些
>>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
>> pengcheng...@bonc.com.cn> 写道:
>>>> 检查一下代码的kafka版本,可能是这方面的错误
>>>> 
>>>> 
>>>> 
>>>> pengcheng...@bonc.com.cn
>>>> 
>>>> 发件人: hb
>>>> 发送时间: 2019-08-26 15:14
>>>> 收件人: user-zh
>>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>>>> 之前少了 flink-connector-kafka_2.11 依赖,
>>>> 现在错误变成  Caused by: java.lang.NoSuchMethodError:
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>>>> 了
>>>> 
>>>> 
>>>> pom依赖:
>>>> ```
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-core
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-clients_2.11
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-scala_2.11
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-streaming-scala_2.11
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table
>>>>   1.9.0
>>>>   pom
>>>>   provided
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-common
>>>>   ${flink.version}
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-cep-scala_2.11
>>>>   ${flink.version}
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-connector-filesystem_2.11
>>>>   ${flink.version}
>>>>   
>>>> 
>>>> 
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-api-scala-bridge_2.11
>>>>   ${flink.version}
>>>> 
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-api-java-bridge_2.11
>>>>   ${flink.version}
>>>>   
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-planner_2.11
>>>>   ${flink.version}
>>>>   
>>>>   
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-runtime-blink_2.11
>>>>   ${flink.version}
>>>>   
>>>> 
>>>> 
>>>> 
>>>> 
>>>>   
>>>>   org.apache.flink
>>>>   flink-table-planner-blink_2.11
>>>>   ${flink.version}
>>>>   
>>>>   
>>>> 
>>>> 
>>>>   
>>>> 

Re: Re:回复: Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 文章 徐骁
这部分有文档吗,看了好几圈没看到

hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:

> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
>
> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
> >kafka版本是 kafka_2.11-1.1.0,
> >支持的kafka版本有哪些
> >在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
> pengcheng...@bonc.com.cn> 写道:
> >>检查一下代码的kafka版本,可能是这方面的错误
> >>
> >>
> >>
> >>pengcheng...@bonc.com.cn
> >>
> >>发件人: hb
> >>发送时间: 2019-08-26 15:14
> >>收件人: user-zh
> >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>之前少了 flink-connector-kafka_2.11 依赖,
> >>现在错误变成  Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>了
> >>
> >>
> >>pom依赖:
> >>```
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-core
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-clients_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-streaming-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table
> >>1.9.0
> >>pom
> >>provided
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-common
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-cep-scala_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-filesystem_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-api-scala-bridge_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-api-java-bridge_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-planner_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-runtime-blink_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-table-planner-blink_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-elasticsearch6_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-kafka-0.11_2.11
> >>${flink.version}
> >>
> >>
> >>
> >>
> >>org.apache.flink
> >>flink-connector-kafka_2.11
> >>${flink.version}
> >>
> >>
> >>org.apache.flink
> >>flink-json
> >>  

Re: flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 Jark Wu
Blink 合并到 flink 后,是作为一种 planner 的实现存在,所以文档是和 flink 在一起的。
如何使用 blink 
planner,可以看这里:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#create-a-tableenvironment
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#create-a-tableenvironment>


Best,
Jark




> 在 2019年8月27日,10:25,Zili Chen  写道:
> 
> Blink 的文档应该都在 [1] 了,并没有跟着 Flink 版本变化而变化的意思呀(x
> 
> Best,
> tison.
> 
> [1] https://github.com/apache/flink/blob/blink/README.md
> 
> 
> rockey...@163.com  于2019年8月27日周二 上午10:18写道:
> 
>> 
>> hi,all
>>flink1.9中关于blink的文档在哪看呀?找了半天找不到 0.0
>> 
>> 
>> rockey...@163.com
>> Have a good day !
>> 



Re: flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 Zili Chen
Blink 的文档应该都在 [1] 了,并没有跟着 Flink 版本变化而变化的意思呀(x

Best,
tison.

[1] https://github.com/apache/flink/blob/blink/README.md


rockey...@163.com  于2019年8月27日周二 上午10:18写道:

>
> hi,all
> flink1.9中关于blink的文档在哪看呀?找了半天找不到 0.0
>
>
> rockey...@163.com
> Have a good day !
>


flink1.9中关于blink的文档在哪看呀

2019-08-26 文章 rockey...@163.com

hi,all
flink1.9中关于blink的文档在哪看呀?找了半天找不到 0.0


rockey...@163.com 
Have a good day !


Re:Re:回复: Re: flink1.9 blink planner table ddl 使用问题

2019-08-26 文章 hb
感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.

在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
>kafka版本是 kafka_2.11-1.1.0,
>支持的kafka版本有哪些
>在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn"  写道:
>>检查一下代码的kafka版本,可能是这方面的错误
>>
>>
>>
>>pengcheng...@bonc.com.cn
>> 
>>发件人: hb
>>发送时间: 2019-08-26 15:14
>>收件人: user-zh
>>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>>之前少了 flink-connector-kafka_2.11 依赖,
>>现在错误变成  Caused by: java.lang.NoSuchMethodError: 
>>org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>>了
>> 
>> 
>>pom依赖:
>>```
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-core
>>${flink.version}
>> 
>> 
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-clients_2.11
>>${flink.version}
>> 
>> 
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-scala_2.11
>>${flink.version}
>> 
>> 
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-streaming-scala_2.11
>>${flink.version}
>> 
>> 
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-table
>>1.9.0
>>pom
>>provided
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-table-common
>>${flink.version}
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-cep-scala_2.11
>>${flink.version}
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-connector-filesystem_2.11
>>${flink.version}
>>
>> 
>> 
>> 
>> 
>>
>>org.apache.flink
>>flink-table-api-scala-bridge_2.11
>>${flink.version}
>>
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-table-api-java-bridge_2.11
>>${flink.version}
>>
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-table-planner_2.11
>>${flink.version}
>>
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-table-runtime-blink_2.11
>>${flink.version}
>>
>> 
>> 
>> 
>> 
>>
>>org.apache.flink
>>flink-table-planner-blink_2.11
>>${flink.version}
>>
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-connector-elasticsearch6_2.11
>>${flink.version}
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-connector-kafka-0.11_2.11
>>${flink.version}
>>
>> 
>> 
>>
>>org.apache.flink
>>flink-connector-kafka_2.11
>>${flink.version}
>>
>>
>>org.apache.flink
>>flink-json
>>${flink.version}
>>
>>
>>org.apache.flink
>>flink-runtime-web_2.11
>>${flink.version}
>>
>>
>> 
>> 
>>```
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>在 2019-08-26 13:37:51,"Jark Wu"  写道:
>>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>>
>>>Best,
>>>Jark
>>>
>>>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
>>>> 
>>>> 使用了你的ddl语句,还是报一样的错误.
>>>> 我是在idea里面执行的,maven 配置的依赖.
>>>> 
>>>> 在 2019-08-26 11:22:20,"Jark Wu"  写道:
>>>>> Hi,
>>>>> 
>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>> 
&g

Re:Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 hb
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖.

在 2019-08-26 11:22:20,"Jark Wu"  写道:
>Hi,
>
>初步看下来你的 DDL 中有这几部分定义的有问题。
>
>1. 缺少format properties
>2. 缺少 connector.version
>3. bootstrap.severs 的配置方式写的不对...
>
>
>你可以参考下面这个作为example:
>
>
>CREATE TABLE kafka_json_source (
>rowtime TIMESTAMP,
>user_name VARCHAR,
>event ROW
>) WITH (
>'connector.type' = 'kafka',
>'connector.version' = 'universal',
>'connector.topic' = 'test-json',
>'connector.startup-mode' = 'earliest-offset',
>'connector.properties.0.key' = 'zookeeper.connect',
>'connector.properties.0.value' = 'localhost:2181',
>'connector.properties.1.key' = 'bootstrap.servers',
>'connector.properties.1.value' = 'localhost:9092',
>'update-mode' = 'append',
>'format.type' = 'json',
>'format.derive-schema' = 'true'
>);
>
>
>Kafka 中的数据长这个样子:
>
>{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
>"message_type": "WARNING", "message": "This is a warning."}}
>
>
>Best,
>Jark
>
>
>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
>> 
>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
>> 需要实现TableSourceFactory,还是其他什么.
>> 
>> 
>> 提示:  
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. findAndCreateTableSource failed.
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> 
>> 
>> 
>> 代码:
>> ```
>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>> import org.apache.flink.types.Row
>> 
>> 
>> object KafkaInDDL extends App {
>>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>>  val settings: EnvironmentSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>> settings)
>> 
>> 
>>  val sourceDDL =
>>"""create table sourceTable(
>>id int,
>>name varchar
>>  ) with (
>>'connector.type' = 'kafka',
>>'connector.property-version' = '1',
>>'update-mode' = 'append',
>>'bootstrap.servers' = '192.168.1.160:19092',
>>'connector.topic' = 'hbtest1',
>>'connector.startup-mode' = 'earliest-offset'
>>  )
>>"""
>>  tEnv.sqlUpdate(sourceDDL)
>>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>  tEnv.execute("")
>> }
>> ```
>


Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 Jark Wu
Hi,

初步看下来你的 DDL 中有这几部分定义的有问题。

1. 缺少format properties
2. 缺少 connector.version
3. bootstrap.severs 的配置方式写的不对...


你可以参考下面这个作为example:


CREATE TABLE kafka_json_source (
rowtime TIMESTAMP,
user_name VARCHAR,
event ROW
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test-json',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);


Kafka 中的数据长这个样子:

{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
"message_type": "WARNING", "message": "This is a warning."}}


Best,
Jark


> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
> 
> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
> 需要实现TableSourceFactory,还是其他什么.
> 
> 
> 提示:  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. findAndCreateTableSource failed.
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> 
> 
> 
> 
> 代码:
> ```
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> import org.apache.flink.types.Row
> 
> 
> object KafkaInDDL extends App {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> settings)
> 
> 
>  val sourceDDL =
>"""create table sourceTable(
>id int,
>name varchar
>  ) with (
>'connector.type' = 'kafka',
>'connector.property-version' = '1',
>'update-mode' = 'append',
>'bootstrap.servers' = '192.168.1.160:19092',
>'connector.topic' = 'hbtest1',
>'connector.startup-mode' = 'earliest-offset'
>  )
>"""
>  tEnv.sqlUpdate(sourceDDL)
>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>  tEnv.execute("")
> }
> ```



flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 hb
flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
需要实现TableSourceFactory,还是其他什么.


提示:  
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.




代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
settings)


  val sourceDDL =
"""create table sourceTable(
id int,
name varchar
  ) with (
'connector.type' = 'kafka',
'connector.property-version' = '1',
'update-mode' = 'append',
'bootstrap.servers' = '192.168.1.160:19092',
'connector.topic' = 'hbtest1',
'connector.startup-mode' = 'earliest-offset'
  )
"""
  tEnv.sqlUpdate(sourceDDL)
  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
  tEnv.execute("")
}
```

Re: blink 版本 消费kafka 看不到group id

2019-06-24 文章 Biao Liu
抱歉,没看懂你的问题
1. group id 和 offset 有什么关系?
2. "在kafka 里看不到group id" 指什么?

雷水鱼  于2019年6月24日周一 下午5:05写道:

> 现象
> 使用这个pom ,在kafka 里看不到group id
> 
>  com.alibaba.blink
>  flink-streaming-scala_2.11
>  blink-3.2.2
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 
>  com.alibaba.blink
>  flink-connector-kafka-0.11_2.11
>  blink-3.2.0
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 使用开源版本,可以看到在kafka 看到group id
> 
>  org.apache.flink
>  flink-streaming-java_2.12
>  1.7.1
> 
>
> 
> 
>  org.apache.flink
>  flink-connector-kafka-0.11_2.12
>  1.7.1
> 
>
> 有没有大佬能看一下,在业务场景 中 还是需要看下 offset 的
>
>
>
>


blink 版本 消费kafka 看不到group id

2019-06-24 文章 雷水鱼
现象 
使用这个pom ,在kafka 里看不到group id 

 com.alibaba.blink
 flink-streaming-scala_2.11
 blink-3.2.2
 
 
 org.slf4j
 slf4j-api
 
 


 com.alibaba.blink
 flink-connector-kafka-0.11_2.11
 blink-3.2.0
 
 
 org.slf4j
 slf4j-api
 
 

使用开源版本,可以看到在kafka 看到group id 

 org.apache.flink
 flink-streaming-java_2.12
 1.7.1




 org.apache.flink
 flink-connector-kafka-0.11_2.12
 1.7.1


有没有大佬能看一下,在业务场景 中 还是需要看下 offset 的





Re: Blink在Hive表没有统计信息的情况下如何优化

2019-05-28 文章 Kurt Young
你先试试把HashJoin这个算子禁用看看,TableConfig里添加这个配置

sql.exec.disabled-operators: HashJoin

Best,
Kurt


On Tue, May 28, 2019 at 3:23 PM bigdatayunzhongyan <
bigdatayunzhong...@aliyun.com> wrote:

> 感谢 @Kurt Young 大神的回复,报错信息在附件。谢谢!
>
>
> 在2019年05月28日 14:10,Kurt Young  写道:
>
> 没有统计信息确实很难生成比较靠谱的执行计划,这也是之前很多DBA的工作 ;-)
>
> 你可以试试以下以下操作:
> 1. 如果是join顺序不合理,可以手动调整sql中的join顺序,并且关闭join reorder
> 2.
> 看看fail的具体原因,如果是个别比较激进的算子表现不好,比如HashAggregate、HashJoin,你可以手动禁止掉这些算子,选择性能稍差但可能执行起来更稳健的算子,比如SortMergeJoin
>
> 这是我拍脑袋想的,具体的建议你先分析一下SQL为什么会fail,然后贴出具体的问题来。
>
> 另外,我们正在开发SQL hint功能,可以有效缓解类似问题。
>
> Best,
> Kurt
>
>
> On Tue, May 28, 2019 at 12:32 PM bigdatayunzhongyan <
> bigdatayunzhong...@aliyun.com> wrote:
>
>>  @Kurt Young  @Jark Wu  @Bowen Li
>> 先描述下我这边的情况,一些简单的SQL无论是否有统计的信息的情况下都能执行成功,
>> 但是一些复杂的SQL在没有统计信息的情况下一直执行失败,尝试开启各种参数,都失败了,很痛苦,
>> 不过在设置统计信息及开启相关参数后很轻松就能执行成功(点赞)。
>> 我们线上的情况是很多表信息都没有统计信息,请问有哪些优化的办法。
>>
>> 启动命令:./bin/yarn-session.sh -n 50 -s 20 -jm 3072 -tm 6144 -d
>> 配置见附件
>> 谢谢!
>>
>>


回复: Blink在Hive表没有统计信息的情况下如何优化

2019-05-28 文章 bigdatayunzhongyan
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Fatal error at remote task manager '/xx:14941'.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:276)
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:182)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.io.network.netty.ZeroCopyNettyMessageDecoder.channelRead(ZeroCopyNettyMessageDecoder.java:128)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.nio.channels.ClosedChannelException
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:297)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:119)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.addCredit(PartitionRequestQueue.java:181)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:140)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:43)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
... 13 more
Caused by: 
org.apache.flink.runtime.io.network.partition.DataConsumptionException: 
java.nio.channels.ClosedChannelException
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView.getNextBuffer(SpilledSubpartitionView.java:150)
at 
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:162)





 ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  - 
Encountered error while consuming partitions
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 

blink访问带kerberos认证kafka的问题

2019-05-24 文章 苏 欣
大家好,我使用blink集群访问带认证的kafka,按照文档上的方式配置flink-conf.yaml文件,配置如下:
 security.kerberos.login.use-ticket-cache: false
 security.kerberos.login.keytab: /home/kafka/kafka.keytab
 security.kerberos.login.principal: kafka/slave2@MYCDH
 security.kerberos.login.contexts: Client,KafkaClient

blink集群总共三台节点,把配置文件依次覆盖后,启动集群出现只能启动standalone主节点而启动不了taskManager的情况。
票据每台机器上都有,去掉上述配置后集群可正常启动。
请问有人遇到过这种情况吗?

sean...@live.com


  1   2   >