Flink与Yarn的状态一致性问题

2020-11-11 文章 amen...@163.com
hi everyone,


最近在使用Flink-1.11.1 On Yarn Per 
Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn 
application仍处于运行状态


疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢


best,
amenhub

Re: Flink sql查询NULL值错误

2020-11-11 文章 Danny Chan
是的 Flink SQL 现在还不支持隐式类型,需要手动设置 NULL 的类型 SQL 才能通过编译。

丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午8:52写道:

> 感谢大佬!!!
>
> > 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道:
> >
> > Hi,
> >
> >
> > 需要将 null cast 成某个具体的值,比如:
> > if(type=1,2,cast(null as int))
> >
> >
> > Best,
> > Hailong
> > 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
> >> Select
> >>  id,
> >>  name,
> >>  if(type=1,2,null)
> >> From
> >>  user ;
> >> 当我执行上面的sql的时候提示我
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of
> ‘NULL’
> >> 是无法将null展示吗?
> >
> >
> >
>
>
>


Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-11 文章 Danny Chan
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑

Lei Wang  于2020年11月11日周三 下午2:03写道:

> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
> 比如
> robot1   2020-11-11 12:00:00 msginfo
> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>
> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>
> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>
> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> 我们必须 按 robotId 做 keyBy
>
> 求大神指教。
>
> 谢谢,
> 王磊
>


Re: 关于filesystem connector的一点疑问

2020-11-11 文章 Jingsong Li
Hi admin,

不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)

On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:

> 补充一下不用partition time trigger的原因,partition
> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>
> > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >
> > Hi ,kandy
> > 我没有基于partition time 提交分区,我是基于默认的process
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >
> >> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>
> >> hi:
> >> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>
> >>
> >> https://cloud.tencent.com/developer/article/1707182
> >>
> >> 这个连接可以看一下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>> Hi,all
> >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> >>> 现在有这样的场景:
> >>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>> 有大佬知道吗,有实际验证过吗
> >>> 感谢
> >>>
> >>> 附上简单sql:
> >>> CREATE TABLE kafka (
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  process_time BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) WITH (
> >>>  'connector' = 'kafka',
> >>>  'topic' = 'topic',
> >>>  'properties.bootstrap.servers' = 'x',
> >>>  'properties.group.id' = 'test-1',
> >>>  'scan.startup.mode' = 'latest-offset',
> >>>  'format' = 'json',
> >>>  'properties.flink.partition-discovery.interval-millis' = '30'
> >>> );
> >>>
> >>> CREATE TABLE filesystem (
> >>>  `day` STRING,
> >>>  `hour` STRING,
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  d BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>  'connector' = 'filesystem',
> >>>  'format' = 'parquet',
> >>>  'path' = 'hdfs://xx',
> >>>  'parquet.compression'='SNAPPY',
> >>>  'sink.partition-commit.policy.kind' = 'success-file'
> >>> );
> >>>
> >>> insert into filesystem
> >>> select
> >>>  from_unixtime(process_time,'-MM-dd') as `day`,
> >>>  from_unixtime(process_time,'HH') as `hour`,
> >>>  a,
> >>>  b,
> >>>  c,
> >>>  d,
> >>>  e,
> >>>  f,
> >>>  g,
> >>>  h,
> >>>  i
> >>> from kafka;
> >>>
> >>>
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >
>
>

-- 
Best, Jingsong Lee


Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
source组件使用的是 kafka connector ,  使用的JdbcRowDataLookupFunction 作为维表查询.
报错信息是下面这种:
2020-11-12 00:10:00.153 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure

The last packet successfully received from the server was 277,705
milliseconds ago.  The last packet sent successfully to the server was 0
milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
at
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
at LookupFunction$2.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.io.EOFException: Can not read response from server. Expected
to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
... 26 common frames omitted
2020-11-12 00:10:01.261 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
补充一下不用partition time trigger的原因,partition 
time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次.
2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure

The last packet successfully received from the server was 815,816
milliseconds ago.  The last packet sent successfully to the server was 1
milliseconds ago.
at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown 
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
at
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
at LookupFunction$10.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$7.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:75)
at JoinTableFuncCollector$6.collect(Unknown Source)
at
org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162)
at LookupFunction$2.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at

Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
sink.partition-commit.trigger 

 process-timeString  Trigger type for partition commit: 'process-time': 
based on the time of the machine, it neither requires partition time extraction 
nor watermark generation. Commit partition once the 'current system time' 
passes 'partition creation system time' plus 'delay'. 'partition-time': based 
on the time that extracted from partition values, it requires watermark 
generation. Commit partition once the 'watermark' passes 'time extracted from 
partition values' plus 'delay'.
sink.partition-commit.delay 

 0 s DurationThe partition will not commit until the delay 
time. If it is a daily partition, should be '1 d', if it is a hourly partition, 
should be '1 h'.
这两个参数都没有设置,都是默认值

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi ,kandy
我没有基于partition time 提交分区,我是基于默认的process 
time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区

> 2020年11月12日 下午12:46,kandy.wang  写道:
> 
> hi:
> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
> delay 时机触发分区提交,得看你的sink.partition-commit.delay
> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
> https://cloud.tencent.com/developer/article/1707182
> 
> 这个连接可以看一下 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>> Hi,all
>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>> 现在有这样的场景:
>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>> 有大佬知道吗,有实际验证过吗
>> 感谢
>> 
>> 附上简单sql:
>> CREATE TABLE kafka (
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   process_time BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.bootstrap.servers' = 'x',
>>   'properties.group.id' = 'test-1',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json',
>>   'properties.flink.partition-discovery.interval-millis' = '30'
>> );
>> 
>> CREATE TABLE filesystem (
>>   `day` STRING,
>>   `hour` STRING,
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   d BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>   'connector' = 'filesystem',
>>   'format' = 'parquet',
>>   'path' = 'hdfs://xx',
>>   'parquet.compression'='SNAPPY',
>>   'sink.partition-commit.policy.kind' = 'success-file'
>> );
>> 
>> insert into filesystem
>> select
>>   from_unixtime(process_time,'-MM-dd') as `day`,
>>   from_unixtime(process_time,'HH') as `hour`,
>>   a,
>>   b,
>>   c,
>>   d,
>>   e,
>>   f,
>>   g,
>>   h,
>>   i
>> from kafka;
>> 
>> 
>> 
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger



Re: Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 Xintong Song
Flink on Yarn 废除了 `-n` 参数后,不再支持指定固定数量的 TM。Flink 会根据作业的并行度,按需向 Yarn
申请资源。所以你说的没错,session 模式下提交新的 job 时 flink 会向 yarn 申请更多的资源。

如果想要限制 session 使用的总资源、可以接受资源不足时后提交的 job 可能无法运行需要等待的话,可以配置
`slotmanager.number-of-slots.max`(默认是 Integer.MAX_VALUE)来限制总的 slot 数量。

如果不想 job 运行结束后 tm 很快被释放、下次提交作业又需要等待 tm 启动时间的话,可以配置
`resourcemanager.taskmanager-timeout`(默认 30s)调大 tm 因空闲而被释放的时间。

Thank you~

Xintong Song



On Wed, Nov 11, 2020 at 11:18 PM kingdomad  wrote:

> flink on yarn使用第一种方式yarn session,先创建一个yarn session,然后再提交job到这个session中。
> 您的意思是这个session所申请的资源会根据我后续提交的job的并发度去动态地无限地扩展?
> 如果我提交了一个并发度为10的job a到这个session,那这个session申请10个slot对应的资源,
> 我再提交一个并发度为10的job b到这个session,这个session再申请10个slot对应的资源?
> job需要多少并发度,session通通都向yarn去申请吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>
>
>
>
>
>
> 在 2020-11-11 17:17:16,"hailongwang" <18868816...@163.com> 写道:
> >根据你 Job 的并发和指定的 TM 的规格来计算出 TM 的数量。
> >
> >
> >在 2020-11-11 16:14:41,"kingdomad"  写道:
> >>我发现1.11版本的yarn-session.sh 
> 废弃了-n参数,那如何指定taskmanager数量?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>--
> >>
> >>kingdomad
> >>
>


Re: slot数量与并行度的大小关系

2020-11-11 文章 Xintong Song
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?

流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
中的几种方式更改作业的并行度。另外,也可以通过配置 `taskmanager.numberOfTaskSlots` 来增加 flink 集群的
slot 数量。

Thank you~

Xintong Song



On Wed, Nov 11, 2020 at 7:54 PM Shawn Huang  wrote:

> Hi,
>
> Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
> 如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。
>
> 目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources
> [3]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html
>
> Best,
> Shawn Huang
>
>
> hl9...@126.com  于2020年11月11日周三 下午2:58写道:
>
> > Hi,all:
> > 我在flink
> > web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
> > Caused by:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not allocate the required slot within slot request timeout.
> > Please make sure that the cluster has enough resources.
> >
> > 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
> >
> >
> >
> > hl9...@126.com
> >
>


Re:关于filesystem connector的一点疑问

2020-11-11 文章 kandy.wang
hi:
按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
delay 时机触发分区提交,得看你的sink.partition-commit.delay
设置的多久,如果超过之后,应当默认是会丢弃的吧。


https://cloud.tencent.com/developer/article/1707182

这个连接可以看一下 







在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>Hi,all
>Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>现在有这样的场景:
>消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>有大佬知道吗,有实际验证过吗
>感谢
>
>附上简单sql:
>CREATE TABLE kafka (
>a STRING,
>b STRING,
>c BIGINT,
>process_time BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) WITH (
>'connector' = 'kafka',
>'topic' = 'topic',
>'properties.bootstrap.servers' = 'x',
>'properties.group.id' = 'test-1',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json',
>'properties.flink.partition-discovery.interval-millis' = '30'
>);
>
>CREATE TABLE filesystem (
>`day` STRING,
>`hour` STRING,
>a STRING,
>b STRING,
>c BIGINT,
>d BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) PARTITIONED BY (`day`, `hour`) WITH (
>'connector' = 'filesystem',
>'format' = 'parquet',
>'path' = 'hdfs://xx',
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file'
>);
>
>insert into filesystem
>select
>from_unixtime(process_time,'-MM-dd') as `day`,
>from_unixtime(process_time,'HH') as `hour`,
>a,
>b,
>c,
>d,
>e,
>f,
>g,
>h,
>i
>from kafka;
>
>
>
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger


关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi,all
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
现在有这样的场景:
消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
有大佬知道吗,有实际验证过吗
感谢

附上简单sql:
CREATE TABLE kafka (
a STRING,
b STRING,
c BIGINT,
process_time BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'x',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '30'
);

CREATE TABLE filesystem (
`day` STRING,
`hour` STRING,
a STRING,
b STRING,
c BIGINT,
d BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) PARTITIONED BY (`day`, `hour`) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'path' = 'hdfs://xx',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file'
);

insert into filesystem
select
from_unixtime(process_time,'-MM-dd') as `day`,
from_unixtime(process_time,'HH') as `hour`,
a,
b,
c,
d,
e,
f,
g,
h,
i
from kafka;



[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger

Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 hailongwang
Hi xiexinyuan341,


我理解这边有 2 个问题:
1. “偶尔会出现连接超时”,这个的话有具体的堆栈吗。如果是因为长时间没有数据的查询导致 connection invalid 
话,这个在1.12,1.11.3 中应该是解决了[1].
2. 你的 source 是什么组件呢?程序抛异常的话,自动重启或者手动重启话,如果是 “最少一次” 语义的话,应该还是会 join 上 sink 
到下游的;或者可以开启 checkpoint,保证 flink 内部的 “精确一次”。


[1] https://issues.apache.org/jira/browse/FLINK-16681


Best,
hailong
在 2020-11-12 09:52:43,"xiexinyuan341"  写道:
>在flink 1.11.1 使用mysql作为维表进行temporal 
>join时,大部分时间都能正常join,偶尔会出现mysql连接超时的情况,此时程序直接抛出异常,这条数据就不能被正确的更新到sink 
>表里面,请问对于这个情况有解决方案吗?


flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
在flink 1.11.1 使用mysql作为维表进行temporal 
join时,大部分时间都能正常join,偶尔会出现mysql连接超时的情况,此时程序直接抛出异常,这条数据就不能被正确的更新到sink 
表里面,请问对于这个情况有解决方案吗?

Re:Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 kingdomad
flink on yarn使用第一种方式yarn session,先创建一个yarn session,然后再提交job到这个session中。
您的意思是这个session所申请的资源会根据我后续提交的job的并发度去动态地无限地扩展?
如果我提交了一个并发度为10的job a到这个session,那这个session申请10个slot对应的资源,
我再提交一个并发度为10的job b到这个session,这个session再申请10个slot对应的资源?
job需要多少并发度,session通通都向yarn去申请吗?















--

kingdomad







在 2020-11-11 17:17:16,"hailongwang" <18868816...@163.com> 写道:
>根据你 Job 的并发和指定的 TM 的规格来计算出 TM 的数量。
>
>
>在 2020-11-11 16:14:41,"kingdomad"  写道:
>>我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>kingdomad
>>


Re:Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang



会多一个 outputConversion 类型转换算子
如果是 DataStream 转 Table API,会多一个 inputConversion 类型转换算子




在 2020-11-11 20:25:31,"Luna Wong"  写道:
>Table API 转 DataStream为啥会出现性能损耗
>
>hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道:
>>
>> 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了?
>> 而不是再实现一个 Connector。
>>
>>
>>
>>
>> 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道:
>> >明白了,多谢。
>> >
>> >是 Canal-Json 格式的 Kafka Connector.
>> >
>> >我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
>> >api 对接 flink。
>> >
>> >现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector.
>> >
>> >
>> >
>> >
>> >--
>> >Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 Luna Wong
Table API 转 DataStream为啥会出现性能损耗

hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道:
>
> 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了?
> 而不是再实现一个 Connector。
>
>
>
>
> 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道:
> >明白了,多谢。
> >
> >是 Canal-Json 格式的 Kafka Connector.
> >
> >我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
> >api 对接 flink。
> >
> >现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector.
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/


ElasticsearchApiCallBridge相关类构造函数问题

2020-11-11 文章 Luna Wong
为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
我还想继承Elasticsearch6ApiCallBridge类。在new
RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。

不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?


Re: UDTAGG在SQL中可以使用么,语法是什么

2020-11-11 文章 Jark Wu
目前 SQL 中不支持 UDTAF。

你使用 UDTAF 的场景是什么呢?

On Wed, 11 Nov 2020 at 20:27, Shuai Xia 
wrote:

> Hi,像TableAggregateFunction可以在SQL中使用么?
>
>


UDTAGG在SQL中可以使用么,语法是什么

2020-11-11 文章 Shuai Xia
Hi,像TableAggregateFunction可以在SQL中使用么?



Re: slot数量与并行度的大小关系

2020-11-11 文章 Shawn Huang
Hi,

Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。

目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html

Best,
Shawn Huang


hl9...@126.com  于2020年11月11日周三 下午2:58写道:

> Hi,all:
> 我在flink
> web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout.
> Please make sure that the cluster has enough resources.
>
> 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
>
>
>
> hl9...@126.com
>


Re:Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang
我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了?
而不是再实现一个 Connector。




在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道:
>明白了,多谢。
>
>是 Canal-Json 格式的 Kafka Connector.
>
>我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
>api 对接 flink。
>
>现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector.
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 LittleFall
明白了,多谢。

是 Canal-Json 格式的 Kafka Connector.

我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
api 对接 flink。

现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector.




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql hbase维表关联性能上不去

2020-11-11 文章 kandy.wang
看了一下hbase的维表关联主要是通过org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction
 实现的,测试了一下性能tps只有大概3-4w, 经加本地cache之后性能仍然没有提升。 分析了一下flink ui LookupJoin 是与kafka 
source的算子 chain 在一起了,这样整个算子的并行度就受限于kafka分区的并行度。
1.想问一下这块的 hbase connector开发,是否有做过connector的性能测试。
2.想问一下,hbase维表关联还有没有性能提升手段。感觉这样的性能都达不到上生产应用的要求。



Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 jiangjiguang719



根据 -p 最大并行度  和-ys  每个TM的slot个数来计算  














在 2020-11-11 17:14:41,"kingdomad"  写道:
>我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量?
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>kingdomad
>


Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 hailongwang
根据你 Job 的并发和指定的 TM 的规格来计算出 TM 的数量。


在 2020-11-11 16:14:41,"kingdomad"  写道:
>我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量?
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>kingdomad
>


Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang


可以的,将 table 转换成 datastream,但是会多一层转换的性能消耗。
方便说下哪个 Connector 有现成的 Table Connector 可以满足需求,但是 Datastream 
Connector不满足需求呢,具体是什么功能呢




在 2020-11-11 16:08:08,"LittleFall" <1578166...@qq.com> 写道:
>非常感谢你的回复!
>
>问下另一个问题,现在有这样一个场景:
>
>1. table api 的计算无法满足一些需求,需要使用 stream api 进行计算;
>2. 有现成可用的 table api connector;
>3. 没有现成可用的 stream api connector,需要进行一段时间的开发适配工作。
>
>那么是否存在一种方法,使用 table api connector 输入输出数据,但是使用 stream api 进行计算?
>
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
>文档中的这一节可以满足以上需求吗?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 kingdomad
我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量?













--

kingdomad



回复: Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-11 文章 史 正超
在flink sql 中用STRING表示datetime,这样的话后续的可操作性会比较大些。

发件人: 丁浩浩 <18579099...@163.com>
发送时间: 2020年11月11日 6:37
收件人: user-zh@flink.apache.org 
主题: Flink cdc mysql 字段是datetime类型时-00-00 00:00:00会被flink转成1970-01-01T00:00

当我mysql字段时datetime并且字段值是-00-00 
00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致?
输出的结果:
2> (true,1,zhangsan,18,1970-01-01T00:00)
3> (true,2,lisi,20,2020-11-11T14:17:46)
4> (true,3,wangwu,99,1970-01-01T00:00)
1> (true,4,zhaoliu,77,1970-01-01T00:00)
日志信息:
2020-11-11 14:30:37,418 - 19755 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value




Re:Flink sql cdc 锁超时

2020-11-11 文章 hailongwang


有更完整的堆栈不?

在 2020-11-11 10:28:02,"丁浩浩" <18579099...@163.com> 写道:
>当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动,
>请问这种情况应该如何处理?
>org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; 
>try restarting transaction Error code: 1205; SQLSTATE: 40001.
>   at 
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
>   at 
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
>   at 
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: 
>Lock wait timeout exceeded; try restarting transaction
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
>   at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
>   at 
> com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782)
>   at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666)
>   at 
> io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201)
>   at 
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465)
>   ... 3 more


Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 LittleFall
非常感谢你的回复!

问下另一个问题,现在有这样一个场景:

1. table api 的计算无法满足一些需求,需要使用 stream api 进行计算;
2. 有现成可用的 table api connector;
3. 没有现成可用的 stream api connector,需要进行一段时间的开发适配工作。

那么是否存在一种方法,使用 table api connector 输入输出数据,但是使用 stream api 进行计算?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset

文档中的这一节可以满足以上需求吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-11 文章 hailongwang
Hi Lei,
我理解这篇文章少介绍了 keyby 的逻辑。
可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间,
同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。


Best,
hailong




在 2020-11-11 12:54:22,"Lei Wang"  写道:
>有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
>比如
>robot1   2020-11-11 12:00:00 msginfo
>之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>
>flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>
>我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>
>这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
>我们必须 按 robotId 做 keyBy
>
>求大神指教。
>
>谢谢,
>王磊


[ANNOUNCE] Apache Flink Stateful Functions 2.2.1 released

2020-11-11 文章 Tzu-Li (Gordon) Tai
The Apache Flink community released the first bugfix release of the
Stateful Functions (StateFun) 2.2 series, version 2.2.1.

This release fixes a critical bug that causes restoring a Stateful
Functions cluster from snapshots (checkpoints or savepoints) to fail under
certain conditions.

*We strongly recommend all users to upgrade to this version.*

*Please check out the release announcement for details on upgrading to
2.2.1:*https://flink.apache.org/news/2020/11/11/release-statefun-2.2.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Dockerfiles for building Stateful Functions Docker images can be
found at:
https://github.com/apache/flink-statefun-docker

Alternatively, Ververica has volunteered to make Stateful Function's images
available for the community via their public Docker Hub registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349291

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: 有没有办法把flink Row类转换为Object

2020-11-11 文章 Jark Wu
你要的是这个功能么? 1.11 上已经支持了。

CloseableIterator result = Table#execute#collect()

On Wed, 11 Nov 2020 at 15:23, ZT.Ren <18668118...@163.com> wrote:

> 基于flink做二次开发中,需要将flink SQL执行结果打印到会话中,会话中执行结果打印流程固定打印List类型数据。
> 大部分查询引擎(比如presto)都会在ResultSet中提供getObject方法,flink中如何实现?


Re: flink 1.11.1 使用sql ,使用hbase作为维表进行temporal join时无法获取数据

2020-11-11 文章 鱼子酱
重新搞了一下,找到原因了
1、没有配置hbase的host
Reading reply sessionid:0x3000484bfd0001d, packet:: clientPath:null
serverPath:null finish

2、protobuf-java版本过高
java.lang.NoClassDefFoundError: com/google/protobuf/LiteralByteString


这个2个问题一直没发现,是因为报错的信息只有debug级别里面有,我之前的日志级别是info,希望后续如果可能的话,把相关的报错的信息提高级别就更好识别啦



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-11 文章 Jark Wu
我估计你是用的 confluent schema registry 的 avro。
可以使用下在 master 分支提供的 avro-confluent format [1]。 需要自己 build 下源码。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/avro-confluent.html

On Wed, 11 Nov 2020 at 14:20, 奔跑的小飞袁  wrote:

> 这是我尝试输出的message长度
> message length is: 529
> message length is: 212
> message length is: 391
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-11 文章 Jark Wu
写个 UDF 再把 1970-01-01T00:00 转回去?

On Wed, 11 Nov 2020 at 14:38, 丁浩浩 <18579099...@163.com> wrote:

> 当我mysql字段时datetime并且字段值是-00-00
> 00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致?
> 输出的结果:
> 2> (true,1,zhangsan,18,1970-01-01T00:00)
> 3> (true,2,lisi,20,2020-11-11T14:17:46)
> 4> (true,3,wangwu,99,1970-01-01T00:00)
> 1> (true,4,zhaoliu,77,1970-01-01T00:00)
> 日志信息:
> 2020-11-11 14:30:37,418 - 19755 WARN
> [debezium-mysqlconnector-mysql_binlog_source-snapshot]
> io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value
> '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos'
> converted to empty value
> 2020-11-11 14:30:37,424 - 19761 WARN
> [debezium-mysqlconnector-mysql_binlog_source-snapshot]
> io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value
> '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos'
> converted to empty value
> 2020-11-11 14:30:37,424 - 19761 WARN
> [debezium-mysqlconnector-mysql_binlog_source-snapshot]
> io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value
> '-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos'
> converted to empty value
>
>
>