回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 Thread
  谢谢Peidian ,我试一下
--
发件人:Peidian Li 
发送时间:2020年9月23日(星期三) 14:02
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block 
cache超用,我这边的解决办法是增大了taskmanager.memory.jvm-overhead.fraction,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。
郑斌斌  于2020年9月23日周三 下午1:33写道:
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

 java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
 Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
 [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
 [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

  at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 13:12
 收件人:user-zh ; 郑斌斌 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
 郑斌斌  于2020年9月23日周三 下午12:29写道:
  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
  单流跑的话,比较正常。
  JOB的内存是4G。版本1.11.1
  --
  发件人:Benchao Li 
  发送时间:2020年9月23日(星期三) 10:50
  收件人:user-zh 
  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

  Hi Tianwang,

  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
  join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
  `Math.max(leftRelativeSize, rightRelativeSize) +
  allowedLateness`,根据你的SQL,这个值应该是6h
  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
  清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
  数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
  `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
  2;`,在你的SQL来讲,就是3h,也就是说
  状态会*多保存*这么久。这个我曾经建过一个issue来优

回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
 |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
[2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
[2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

 at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 13:12
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
郑斌斌  于2020年9月23日周三 下午12:29写道:
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
 单流跑的话,比较正常。
 JOB的内存是4G。版本1.11.1
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 10:50
 收件人:user-zh 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 Hi Tianwang,

 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
 `Math.max(leftRelativeSize, rightRelativeSize) +
 allowedLateness`,根据你的SQL,这个值应该是6h
 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
 2;`,在你的SQL来讲,就是3h,也就是说
 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

 希望这个可以解答你的疑惑~

 [1] https://issues.apache.org/jira/browse/FLINK-18996

 Tianwang Li  于2020年9月22日周二 下午8:26写道:

 > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
 >
 >
 > 【join】
 >
 > > SELECT `b`.`rowtime`,
 > > `a`.`c_id`,
 > > `b`.`openid`
 > > FROM `test_table_a` AS `a`
 > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
 > > AND `a`.`openid` = `b`.`openid`
 > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND

回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 Thread
 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
单流跑的话,比较正常。
JOB的内存是4G。版本1.11.1
--
发件人:Benchao Li 
发送时间:2020年9月23日(星期三) 10:50
收件人:user-zh 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li



Flink 维表延迟join

2020-08-26 Thread
小伙伴们:
  
大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。

Thanks

回复:flink 1.11 order by rowtime报错

2020-08-19 Thread
   非常感谢,按照您给出的jira,我修改源码后好用了。
--
发件人:Benchao Li 
发送时间:2020年8月19日(星期三) 19:48
收件人:user-zh ; 郑斌斌 
主 题:Re: flink 1.11 order by rowtime报错

Hi 斌斌,

感觉你应该是遇到了一个已知的bug[1]

[1] https://issues.apache.org/jira/browse/FLINK-16827
郑斌斌  于2020年8月19日周三 下午1:20写道:

 报下面的这个错误,并行度设置为1就没有问题了,不知道为什么

 java.lang.NullPointerExcpetion
   at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
   at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
   at 
org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:101)

 --
 发件人:china_tao 
 发送时间:2020年8月19日(星期三) 00:17
 收件人:user-zh 
 主 题:Re: flink 1.11 order by rowtime报错

 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对



 --
 Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li



回复:flink 1.11 order by rowtime报错

2020-08-18 Thread

报下面的这个错误,并行度设置为1就没有问题了,不知道为什么

java.lang.NullPointerExcpetion
  at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
  at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
  at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:101)

--
发件人:china_tao 
发送时间:2020年8月19日(星期三) 00:17
收件人:user-zh 
主 题:Re: flink 1.11 order by rowtime报错

错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对



--
Sent from: http://apache-flink.147419.n8.nabble.com/



flink 1.11 order by rowtime报错

2020-08-18 Thread
小伙伴们 :
   大家好,请教个问题,执行order by SQL文时,为什么报下面的错误:
SQL文:select  order_id,product_id FROM kafka_order order by  rowtime


Thanks & Regards

回复:flink 聚合 job 重启问题

2020-07-28 Thread
 需要通过checkpoint恢复启动才没有问题,不知道为什么是这样
--
发件人:RS 
发送时间:2020年7月27日(星期一) 15:50
收件人:user-zh@flink.apache.org ; 郑斌斌 

主 题:Re:flink 聚合 job 重启问题

伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了

在 2020-07-27 11:33:31,"郑斌斌"  写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt 
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks
>
>





flink 聚合 job 重启问题

2020-07-26 Thread
hi all :

 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt 
from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.

Thanks




FlinkKafkaConsumer API 维表关联

2020-07-15 Thread
各位好:

请教一下,用FlinkKafkaConsumer API的话,如何支持SQL的方式,和维表关联。(之前用Kafka 
API API是可以的)
 "select  a.id,b.name from kafka_table a "
+ "join dim_table FOR SYSTEM_TIME AS OF a.proctime as b on a.id = 
b.user_id";

thanks & Regards

回复:kafkaf To mysql 写入问题

2020-07-03 Thread

谢谢了,查了下jira, No.1在1.11中才用修复, 另外,目前我用的版本就是1.10
https://issues.apache.org/jira/browse/FLINK-15396
--
发件人:Jingsong Li 
发送时间:2020年7月3日(星期五) 14:29
收件人:user-zh ; 郑斌斌 
主 题:Re: kafkaf To mysql 写入问题

Hi,

估计需要使用Flink 1.11。

1.JSON Format有参数控制 [1]
2.是之前的bug,Flink 1.11应该是不会存在了,不确定1.10.1有没有修。

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-ignore-parse-errors

Best,
Jingsong
On Fri, Jul 3, 2020 at 1:38 PM 郑斌斌  wrote:
dear:
请教两个问题
 1)  用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样,
 
改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225
 不知道,用Kafka类的话,如何解决
 .connect(
 new Kafka()
   .version("0.10")
   .topic("test-input")
 2)  对于timestamp类型字段,用JDBCAppendTableSink 
把DataStream写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误
 kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的
 jdbc参数类型设置为Types.SQL_TIMESTAMP
 thanks


-- 
Best, Jingsong Lee



kafkaf To mysql 写入问题

2020-07-02 Thread
dear:
   请教两个问题
1)  用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样,
改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225
不知道,用Kafka类的话,如何解决
.connect(
new Kafka()
  .version("0.10")
  .topic("test-input")
2)  对于timestamp类型字段,用JDBCAppendTableSink 
把DataStream写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误
kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的
jdbc参数类型设置为Types.SQL_TIMESTAMP
thanks