Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?
此外就是可以发下更完整的异常栈么?

zilong xiao  于2020年11月24日周二 下午2:54写道:

> Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
>
> Benchao Li  于2020年11月24日周二 下午2:49写道:
>
> > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> >
> > zilong xiao  于2020年11月24日周二 上午10:49写道:
> >
> > > [image: image.png]
> > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 退订

2020-11-23 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

李军  于2020年11月24日周二 下午2:49写道:

>
>


Re: ProcessingTime下的watermark

2020-11-23 文章 Xingbo Huang
Hi,

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

Best,
Xingbo


Kyle Zhang  于2020年11月24日周二 下午1:34写道:

> Hi,
> 使用flink1.11,在SQL ddl中基于process time声明watermark报错
>
> SQL validation failed. Watermark can not be defined for a processing time
> attribute column.
>
> 文档里关于watermark的解释也基本是跟eventTime在一起[1]
> 我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks
>
> Best
>


FlinkSQL导致Prometheus内存暴涨

2020-11-23 文章 Luna Wong
FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。

task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~

Benchao Li  于2020年11月24日周二 下午2:49写道:

> 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
>
> zilong xiao  于2020年11月24日周二 上午10:49写道:
>
> > [image: image.png]
> > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> >
>
>
> --
>
> Best,
> Benchao Li
>


退订

2020-11-23 文章 李军



Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。

zilong xiao  于2020年11月24日周二 上午10:49写道:

> [image: image.png]
> 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
>


-- 

Best,
Benchao Li


flink table api 或者 sql 使用 自定义含有的state方法

2020-11-23 文章 戴嘉诚
大家好:
请问,因为当前flink sql或者flink table
中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink
sql了,只能自己使用flink datastream去硬编码,请问,flink
sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink
job?


Re: 关于Catalog的建议

2020-11-23 文章 admin
感谢jark大佬,试过了确实可以
我是先用hive的catalog+dialect 建了 hive表,
然后切换到default catalog 建了 kafka source表,
在insert into hive select from 
kafka时需要指定hive_catalog.hive_db.hive_table,否则会报表不存在,因为当前是在default catalog 
下。大家注意一下

> 2020年11月24日 上午11:41,Jark Wu  写道:
> 
> 1. 可以的
> 2. 是的。见文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
> 3. 是的。
> 
> Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。
> 
> Best,
> Jark
> 
> 
> On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:
> 
>> Hi Rui Li,
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>> 
>> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
>> 几个问题请教一下:
>> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
>> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
>> catalogName(default_catalog/hive_catalog)
>> 
>> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
>>USE db1;
>> 感谢
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>> 
>>> 2020年11月23日 下午8:52,Rui Li  写道:
>>> 
>>> Hi,
>>> 
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>>> 
>>> 关于你的两个问题:
>>> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
>>> [1],文档也许是可以说的更明确一些。
>>> 2.
>>> 
>> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
>>> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
>>> 
>>> [1]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
>>> 
>>> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
>>> 
 目前Flink提供memory、jdbc、hive这3种catalog。
 感觉实际使用中,可以使用如下几种方案。
 
 (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
 (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
 
 方案1和方案2各有优缺点。
 方案1的优点:
   比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
 
 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
 方案1的缺点:
   很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
 
 -然后,我的问题来了。
 
 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
 
 
 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
 
 
 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
 
>>> 
>>> 
>>> --
>>> Best regards!
>>> Rui Li
>> 
>> 



ProcessingTime下的watermark

2020-11-23 文章 Kyle Zhang
Hi,
使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time
attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]
我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

Best


Re:Flink SQL 对延迟数据怎么处理?

2020-11-23 文章 hailongwang
Hi,
  据我所知,FlinkSQL 不支持将迟到的数据输出到侧流中。
如果你下游使用的是 window 的话,可以通过设置
`table.exec.emit.late-fire.enabled` 和 `table.exec.emit.late-fire.delay` 来触发晚于 
watermark 到达的数据。
其中允许等待晚与 watermark 的数据的时间由 `table.exec.state.ttl` 控制,等价于 Datastream 中的 
allowedLateness,
故 window 的最大等待时间为 watermark 的 outOfOrder + allowedLateness。


Best,
Hailong
在 2020-11-24 09:03:13,"jy l"  写道:
>Hi:
>请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?
>
>祝好!


Re:关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 hailongwang



这个错误感觉是 Hbase 的错误。具体实现的话,你可以参考社区的 HBaseSinkFunction[1] 的实现。
[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
Best,
Hailong




在 2020-11-24 09:32:55,"bradyMk"  写道:
>请教各位:
>我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样:
> 
>但是我的任务会频繁报出如下错误:
> 
>感觉貌似是我代码的问题导致的,但又不知道原因,希望得到指导,感激不尽~
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 文章 hailongwang
Hi,
  不可以的,其中链接[1] 是Flink SQL 支持的所有内置函数,链接[2] 是 Flink SQL 允许自己定义函数,来满足个性化需求。
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html


Best,
Hailong

在 2020-11-24 00:41:49,"macdoor"  写道:
>需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样
>
>select bin_to_uuid(id, true) as text_uuid from usertable
>
>我尝试使用,报错说  bin_to_uuid 找不到
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用hive udf函数

2020-11-23 文章 酷酷的浑蛋
Flink-1.11.1,  hive-2.2.0
在使用current_timestamp或者current_date函数时会报
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51)
at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)





Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
我使用的是release-1.12.0-rc1

Best

Jark Wu  于2020年11月24日周二 上午11:42写道:

> 看报错像是一个 bug。 请问使用的是哪个版本呢?
> 可以去 JIRA issue 提个 issue。
>
> Best,
> Jark
>
> On Tue, 24 Nov 2020 at 11:27, jy l  wrote:
>
> > Hi:
> > FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
> >
> > 我的DDL:
> > create table if not exists t_order(
> > id int PRIMARY KEY comment '订单id',
> > timestamps bigint comment '订单创建时间',
> > orderInformationId string comment '订单信息ID',
> > userId string comment '用户ID',
> > categoryId int comment '商品类别',
> > productId int comment '商品ID',
> > price decimal(10,2) comment '单价',
> > productCount int comment '购买数量',
> > priceSum decimal(10,2) comment '订单总价',
> > shipAddress string comment '商家地址',
> > receiverAddress string comment '收货地址',
> > ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> > )with(
> > 'connector' = 'kafka',
> > 'format' = 'debezium-avro-confluent',
> > 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> 
> > 
> > ',
> > 'topic' = 'ods.userAnalysis.order',
> > 'properties.bootstrap.servers' = '手动打码ip:9092',
> > 'properties.group.id' = 'flink-analysis',
> > 'scan.startup.mode' = 'latest-offset'
> > )
> >
> > 我在查询该表时,使用如下查询语句能够正常查询出来:
> >
> >- select * from t_order
> >- select receiverAddress from t_order
> >- select
> >id,
> >timestamps,
> >orderInformationId,
> >userId,
> >categoryId,
> >productId,
> >price,
> >productCount,
> >priceSum,
> >shipAddress
> >from t_order
> >
> > 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> > select
> > id,
> > timestamps,
> > orderInformationId,
> > userId,
> > categoryId,
> > productId,
> > price,
> > productCount,
> > priceSum,
> > shipAddress,
> > receiverAddress
> > from t_order,
> > 报错信息如下:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> This
> > calc has no useful projection and no filter. It should be removed by
> > CalcRemoveRule.
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > ...
> >
> > receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> > 这具体是什么原因呢?望各位大佬告知。
> >
> >
> > 祝好!
> >
>


Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 Jark Wu
看报错像是一个 bug。 请问使用的是哪个版本呢?
可以去 JIRA issue 提个 issue。

Best,
Jark

On Tue, 24 Nov 2020 at 11:27, jy l  wrote:

> Hi:
> FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
>
> 我的DDL:
> create table if not exists t_order(
> id int PRIMARY KEY comment '订单id',
> timestamps bigint comment '订单创建时间',
> orderInformationId string comment '订单信息ID',
> userId string comment '用户ID',
> categoryId int comment '商品类别',
> productId int comment '商品ID',
> price decimal(10,2) comment '单价',
> productCount int comment '购买数量',
> priceSum decimal(10,2) comment '订单总价',
> shipAddress string comment '商家地址',
> receiverAddress string comment '收货地址',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> )with(
> 'connector' = 'kafka',
> 'format' = 'debezium-avro-confluent',
> 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> 
> ',
> 'topic' = 'ods.userAnalysis.order',
> 'properties.bootstrap.servers' = '手动打码ip:9092',
> 'properties.group.id' = 'flink-analysis',
> 'scan.startup.mode' = 'latest-offset'
> )
>
> 我在查询该表时,使用如下查询语句能够正常查询出来:
>
>- select * from t_order
>- select receiverAddress from t_order
>- select
>id,
>timestamps,
>orderInformationId,
>userId,
>categoryId,
>productId,
>price,
>productCount,
>priceSum,
>shipAddress
>from t_order
>
> 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> select
> id,
> timestamps,
> orderInformationId,
> userId,
> categoryId,
> productId,
> price,
> productCount,
> priceSum,
> shipAddress,
> receiverAddress
> from t_order,
> 报错信息如下:
> Exception in thread "main" org.apache.flink.table.api.TableException: This
> calc has no useful projection and no filter. It should be removed by
> CalcRemoveRule.
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> ...
>
> receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> 这具体是什么原因呢?望各位大佬告知。
>
>
> 祝好!
>


Re: 关于Catalog的建议

2020-11-23 文章 Jark Wu
1. 可以的
2. 是的。见文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
3. 是的。

Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。

Best,
Jark


On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:

> Hi Rui Li,
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>
> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
> 几个问题请教一下:
> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
> catalogName(default_catalog/hive_catalog)
>
> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
> USE db1;
> 感谢
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>
> > 2020年11月23日 下午8:52,Rui Li  写道:
> >
> > Hi,
> >
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> >
> > 关于你的两个问题:
> > 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> > [1],文档也许是可以说的更明确一些。
> > 2.
> >
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> > metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> >
> > On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> >
> >> 目前Flink提供memory、jdbc、hive这3种catalog。
> >> 感觉实际使用中,可以使用如下几种方案。
> >>
> >> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
> >> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
> >>
> >> 方案1和方案2各有优缺点。
> >> 方案1的优点:
> >>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
> >>
> >>
> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
> >> 方案1的缺点:
> >>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
> >>
> >> -然后,我的问题来了。
> >>
> >>
> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
> >> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
> >>
> >>
> >>
> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
> >>
> >>
> >>
> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
> >>
> >
> >
> > --
> > Best regards!
> > Rui Li
>
>


Re: 关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 bradyMk
补充下上个问题中图片的文字版:
图一:
 if (count > 300) {
mutator.flush()
count = 0
  }
  count = count + 1

图二:
Caused by:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed
101 actions: Operation Timeout: 101 times, servers with issues:
172.xx.x.x,16020,1601173606933
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:297)
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2300(AsyncProcess.java:273)
at
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1906)
at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:250)
at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:213)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
Hi:
FlinkSQL我在使用时发生一件很诡异的事件。具体如下:

我的DDL:
create table if not exists t_order(
id int PRIMARY KEY comment '订单id',
timestamps bigint comment '订单创建时间',
orderInformationId string comment '订单信息ID',
userId string comment '用户ID',
categoryId int comment '商品类别',
productId int comment '商品ID',
price decimal(10,2) comment '单价',
productCount int comment '购买数量',
priceSum decimal(10,2) comment '订单总价',
shipAddress string comment '商家地址',
receiverAddress string comment '收货地址',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
)with(
'connector' = 'kafka',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
',
'topic' = 'ods.userAnalysis.order',
'properties.bootstrap.servers' = '手动打码ip:9092',
'properties.group.id' = 'flink-analysis',
'scan.startup.mode' = 'latest-offset'
)

我在查询该表时,使用如下查询语句能够正常查询出来:

   - select * from t_order
   - select receiverAddress from t_order
   - select
   id,
   timestamps,
   orderInformationId,
   userId,
   categoryId,
   productId,
   price,
   productCount,
   priceSum,
   shipAddress
   from t_order

但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress,
receiverAddress
from t_order,
报错信息如下:
Exception in thread "main" org.apache.flink.table.api.TableException: This
calc has no useful projection and no filter. It should be removed by
CalcRemoveRule.
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
...

receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
这具体是什么原因呢?望各位大佬告知。


祝好!


Re: 关于Catalog的建议

2020-11-23 文章 admin
Hi Rui Li,
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。

一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
几个问题请教一下:
1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG 
catalogName(default_catalog/hive_catalog)
3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
USE db1;
感谢

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html

> 2020年11月23日 下午8:52,Rui Li  写道:
> 
> Hi,
> 
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> 
> 关于你的两个问题:
> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> [1],文档也许是可以说的更明确一些。
> 2.
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> 
> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> 
>> 目前Flink提供memory、jdbc、hive这3种catalog。
>> 感觉实际使用中,可以使用如下几种方案。
>> 
>> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
>> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
>> 
>> 方案1和方案2各有优缺点。
>> 方案1的优点:
>>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
>> 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
>> 方案1的缺点:
>>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
>> 
>> -然后,我的问题来了。
>> 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
>> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
>> 
>> 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
>> 
>> 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
>> 
> 
> 
> -- 
> Best regards!
> Rui Li



Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
[image: image.png]
如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。


关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 bradyMk
请教各位:
我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样:
 
但是我的任务会频繁报出如下错误:
 
感觉貌似是我代码的问题导致的,但又不知道原因,希望得到指导,感激不尽~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL 对延迟数据怎么处理?

2020-11-23 文章 jy l
Hi:
请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?

祝好!


flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 文章 macdoor
需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样

select bin_to_uuid(id, true) as text_uuid from usertable

我尝试使用,报错说  bin_to_uuid 找不到





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-23 文章 macdoor
自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2
的一个bug,1.12应该已经改正了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

statementset下source怎么完全复用

2020-11-23 文章 Jeff
请问一下,flink 1.11statement set 怎么复用同一个source呢? 
希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?

关于Catalog的建议

2020-11-23 文章 赵一旦
目前Flink提供memory、jdbc、hive这3种catalog。
感觉实际使用中,可以使用如下几种方案。

(1)选择memory catalog,然后每次sql都带上自己的相关DDL。
(2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。

方案1和方案2各有优缺点。
方案1的优点:
比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
方案1的缺点:
很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。

-然后,我的问题来了。
在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。

问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。

当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。


Re: flink读mysql分库分表

2020-11-23 文章 Leonard Xu
Hi,
我没理解错的话你是想一次读出所有表(分库分表)的所有数据, 用一个DDL建表语句搞定,目前还不支持

祝好,
Leonard

> 在 2020年11月23日,17:22,酷酷的浑蛋  写道:
> 
> 
> 
> flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?
> 



Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
哦哦, 好吧,我一直以为你说的“新旧”是是否指定了update-mode。理解错了。
good,那应该没问题了,我去改改。




Jark Wu  于2020年11月23日周一 下午5:18写道:

> 你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query
> 是否有更新来决定工作模式的。
> 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 17:14, 赵一旦  wrote:
>
> > duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> > values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
> >
> > 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> > 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
> >
> >
> > 赵一旦  于2020年11月23日周一 下午5:09写道:
> >
> > > 总结下:
> > > (1)group
> > >
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> > >
> > >
> > >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> > >
> > >
> > > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> > update方式输出。
> > > 甚至DDL中推荐可以搞个自定义on
> > > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on
> duplicate
> > > update功能。
> > >
> > >
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午4:48写道:
> > >
> > >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> > >> 发现这种方式也不行,但是加了group by之后是可以的。
> > >>
> > >> (1)
> > >> 所以说是否还需要query带有key的语义才行呢?
> > >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> > >>
> > >> 那么group by + tumble
> window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> > >>
> > >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> > >>
> > >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> > >>
> > >> Jark Wu  于2020年11月23日周一 下午4:28写道:
> > >>
> > >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> > >>>
> > >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
> > >>>
> > >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> > >>> >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > >>> > 页面。
> > >>> >
> > >>> >
> > >>> >
> > >>>
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> > >>> >
> > >>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> > >>> >
> > >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >>> > >
> > >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> > >>> > >
> > >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > >>> > > > >
> > >>> > > >
> > >>> > > > Flink uses the primary key that defined in DDL when writing
> data
> > to
> > >>> > > > external databases. The connector operate in upsert mode if the
> > >>> primary
> > >>> > > key
> > >>> > > > was defined, otherwise, the connector operate in append mode.
> > >>> > > >
> > >>> > > > In upsert mode, Flink will insert a new row or update the
> > existing
> > >>> row
> > >>> > > > according to the primary key, Flink can ensure the idempotence
> in
> > >>> this
> > >>> > > way.
> > >>> > > > To guarantee the output result is as expected, it’s recommended
> > to
> > >>> > define
> > >>> > > > primary key for the table and make sure the primary key is one
> of
> > >>> the
> > >>> > > > unique key sets or primary key of the underlying database
> table.
> > In
> > >>> > > append
> > >>> > > > mode, Flink will interpret all records as INSERT messages, the
> > >>> INSERT
> > >>> > > > operation may fail if a primary key or unique constraint
> > violation
> > >>> > > happens
> > >>> > > > in the underlying database.
> > >>> > > >
> > >>> > > > See CREATE TABLE DDL
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > >>> > > > >
> > >>> > > > for
> > >>> > > > more details about PRIMARY KEY syntax.
> > >>> > > >
> > >>> > > >
> > >>> > > > 这里也有一点,In append mode, Flink will interpret all records as
> INSERT
> > >>> > > messages,
> > >>> > > > the INSERT operation may fail if a primary key or unique
> > constraint
> > >>> > > > violation happens in the underlying database.  什么叫append
> > >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >>> > > >
> > >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > >>> > > >
> > >>> > > > > 补充sql:
> > >>> > > > >
> > >>> > > > > DDL:
> > >>> > > > >
> > >>> > > > > CREATE TABLE flink_recent_pv_subid
> > >>> > > > > (
> > >>> > > > > `supply_id` STRING,
> > >>> > > > > `subid` STRING,
> > >>> > > > > `mark`  STRING,
> > >>> > > > > `time`  STRING,
> > >>> > > > > `pv`BIGINT,
> > >>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, 

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 文章 赵一旦
@Yun Tang,应该是这个问题。
请教下这几个参数具体含义。

backoff in milliseconds for partition requests of input channels
是什么逻辑,以及initial和max分别表达含义。


akka.ask.timeout这个参数相对明显,就是超时,这个以前也涉及过,在cancel/submit/savepoint等情况都可能导致集群slot陆续没掉,然后再陆续回来(pass环境,基本就是部分机器失联,然后重新连接的case)。



Yun Tang  于2020年11月23日周一 下午5:11写道:

> Hi
>
> 集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大
> taskmanager.network.request-backoff.max [1][2] 以增大重试次数
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max
> [2] https://juejin.cn/post/6844904185347964942#heading-8
>
>
> 祝好
> 唐云
> 
> From: 赵一旦 
> Sent: Monday, November 23, 2020 13:08
> To: user-zh@flink.apache.org 
> Subject: Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。
>
> 这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
> 这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。
>
> zhisheng  于2020年11月18日周三 下午10:51写道:
>
> > 是不是有 kafka 机器挂了?
> >
> > Best
> > zhisheng
> >
> > hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
> >
> > > 感觉还有其它 root cause,可以看下还有其它日志不?
> > >
> > >
> > > Best,
> > > Hailong
> > >
> > > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > > >2020-11-18 16:51:37
> > > >org.apache.flink.runtime.io
> > .network.partition.PartitionNotFoundException:
> > > >Partition
> > > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > > >not found.
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >
> > >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >
> > >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > > >)
> > > >at
> > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > > >.java:670)
> > > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > > >CompletableFuture.java:646)
> > > >at java.util.concurrent.CompletableFuture$Completion.run(
> > > >CompletableFuture.java:456)
> > > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > > >at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > > >ForkJoinExecutorConfigurator.scala:44)
> > > >at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > >at
> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > > >.java:1339)
> > > >at
> > > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > >at
> > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > > >.java:107)
> > > >
> > > >
> > > >请问这是什么问题呢?
> > >
> >
>


flink读mysql分库分表

2020-11-23 文章 酷酷的浑蛋


flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?



Re: pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 文章 Xingbo Huang
Hi,

你可以帖下taskmanager的日志吗,这个日志只能看到启动Python进程的时候挂掉了,其他信息看不到。

Best,
Xingbo

fengmuqi...@ruqimobility.com  于2020年11月23日周一
下午4:11写道:

> hi.
> pyflink 1.11.1 调用包含第三方依赖库的udf时报错 :
>
> 运行环境:
> windows10
> python==3.7.9
> apache-flink==1.11.1
> apache-beam==2.19.0
>
> udf 依赖第三方库:
> h3==3.7.0
>
> pytest 通过。
>
> 运行时报错,报错信息如下
> 2020-11-23 14:20:51,656 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, order_info]], fields=[source, order_id, user_id, car_id,
> driver_name, driver_id, time_dispatch_done, time_start, time_cancel,
> status, start_city, end_city, start_ad_code, end_ad_code, cancel_reason_id,
> realStartLatitude, realStartLongitude]) -> StreamExecPythonCalc -> Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[hash_h3]) (3/8) (056a0a0cdf3838794f4023e61d04a690) switched from
> RUNNING to FAILED.
> java.lang.RuntimeException: Failed to create stage bundle factory!
>at
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: Process died with exit code 0
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query 是否有更新来决定工作模式的。
如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。

Best,
Jark

On Mon, 23 Nov 2020 at 17:14, 赵一旦  wrote:

> duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
>
> 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
>
>
> 赵一旦  于2020年11月23日周一 下午5:09写道:
>
> > 总结下:
> > (1)group
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> >
> >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> >
> >
> > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> update方式输出。
> > 甚至DDL中推荐可以搞个自定义on
> > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> > update功能。
> >
> >
> >
> >
> > 赵一旦  于2020年11月23日周一 下午4:48写道:
> >
> >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> >> 发现这种方式也不行,但是加了group by之后是可以的。
> >>
> >> (1)
> >> 所以说是否还需要query带有key的语义才行呢?
> >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> >>
> >> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> >>
> >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> >>
> >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> >>
> >> Jark Wu  于2020年11月23日周一 下午4:28写道:
> >>
> >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> >>>
> >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
> >>>
> >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >>> >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> >>> > 页面。
> >>> >
> >>> >
> >>> >
> >>>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >>> >
> >>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> >>> >
> >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >>> > >
> >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >>> > >
> >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >>> > > > >
> >>> > > >
> >>> > > > Flink uses the primary key that defined in DDL when writing data
> to
> >>> > > > external databases. The connector operate in upsert mode if the
> >>> primary
> >>> > > key
> >>> > > > was defined, otherwise, the connector operate in append mode.
> >>> > > >
> >>> > > > In upsert mode, Flink will insert a new row or update the
> existing
> >>> row
> >>> > > > according to the primary key, Flink can ensure the idempotence in
> >>> this
> >>> > > way.
> >>> > > > To guarantee the output result is as expected, it’s recommended
> to
> >>> > define
> >>> > > > primary key for the table and make sure the primary key is one of
> >>> the
> >>> > > > unique key sets or primary key of the underlying database table.
> In
> >>> > > append
> >>> > > > mode, Flink will interpret all records as INSERT messages, the
> >>> INSERT
> >>> > > > operation may fail if a primary key or unique constraint
> violation
> >>> > > happens
> >>> > > > in the underlying database.
> >>> > > >
> >>> > > > See CREATE TABLE DDL
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >>> > > > >
> >>> > > > for
> >>> > > > more details about PRIMARY KEY syntax.
> >>> > > >
> >>> > > >
> >>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> >>> > > messages,
> >>> > > > the INSERT operation may fail if a primary key or unique
> constraint
> >>> > > > violation happens in the underlying database.  什么叫append
> >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> >>> > > >
> >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> >>> > > >
> >>> > > > > 补充sql:
> >>> > > > >
> >>> > > > > DDL:
> >>> > > > >
> >>> > > > > CREATE TABLE flink_recent_pv_subid
> >>> > > > > (
> >>> > > > > `supply_id` STRING,
> >>> > > > > `subid` STRING,
> >>> > > > > `mark`  STRING,
> >>> > > > > `time`  STRING,
> >>> > > > > `pv`BIGINT,
> >>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
> >>> ENFORCED
> >>> > > > > ) WITH (
> >>> > > > >   'connector.type' = 'jdbc',
> >>> > > > >
> >>> > > > >   ..
> >>> > > > >
> >>> > > > > );
> >>> > > > >
> >>> > > > >
> >>> > > > > 查询SQL:
> >>> > > > >
> >>> > > > > INSERT INTO
> >>> > > > > flink_recent_pv_subid
> >>> > > > > SELECT
> >>> > > > > `sid`,
> >>> > > > > `subid`,
> >>> > > > > `mark`,
> >>> > > > > 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。

此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。


赵一旦  于2020年11月23日周一 下午5:09写道:

> 总结下:
> (1)group
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
>
>
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
>
>
> 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
> 甚至DDL中推荐可以搞个自定义on
> duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> update功能。
>
>
>
>
> 赵一旦  于2020年11月23日周一 下午4:48写道:
>
>> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
>> 发现这种方式也不行,但是加了group by之后是可以的。
>>
>> (1)
>> 所以说是否还需要query带有key的语义才行呢?
>> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>>
>> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>>
>> (2)如JarkWu所说,是mysql表的DDL部分决定。
>>
>> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>>
>> Jark Wu  于2020年11月23日周一 下午4:28写道:
>>
>>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>>
>>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>>>
>>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>>> >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>>> > 页面。
>>> >
>>> >
>>> >
>>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>>> >
>>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
>>> >
>>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>>> > >
>>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
>>> > >
>>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>> > > > >
>>> > > >
>>> > > > Flink uses the primary key that defined in DDL when writing data to
>>> > > > external databases. The connector operate in upsert mode if the
>>> primary
>>> > > key
>>> > > > was defined, otherwise, the connector operate in append mode.
>>> > > >
>>> > > > In upsert mode, Flink will insert a new row or update the existing
>>> row
>>> > > > according to the primary key, Flink can ensure the idempotence in
>>> this
>>> > > way.
>>> > > > To guarantee the output result is as expected, it’s recommended to
>>> > define
>>> > > > primary key for the table and make sure the primary key is one of
>>> the
>>> > > > unique key sets or primary key of the underlying database table. In
>>> > > append
>>> > > > mode, Flink will interpret all records as INSERT messages, the
>>> INSERT
>>> > > > operation may fail if a primary key or unique constraint violation
>>> > > happens
>>> > > > in the underlying database.
>>> > > >
>>> > > > See CREATE TABLE DDL
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>>> > > > >
>>> > > > for
>>> > > > more details about PRIMARY KEY syntax.
>>> > > >
>>> > > >
>>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>>> > > messages,
>>> > > > the INSERT operation may fail if a primary key or unique constraint
>>> > > > violation happens in the underlying database.  什么叫append
>>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>>> > > >
>>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>>> > > >
>>> > > >
>>> > > >
>>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
>>> > > >
>>> > > > > 补充sql:
>>> > > > >
>>> > > > > DDL:
>>> > > > >
>>> > > > > CREATE TABLE flink_recent_pv_subid
>>> > > > > (
>>> > > > > `supply_id` STRING,
>>> > > > > `subid` STRING,
>>> > > > > `mark`  STRING,
>>> > > > > `time`  STRING,
>>> > > > > `pv`BIGINT,
>>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
>>> ENFORCED
>>> > > > > ) WITH (
>>> > > > >   'connector.type' = 'jdbc',
>>> > > > >
>>> > > > >   ..
>>> > > > >
>>> > > > > );
>>> > > > >
>>> > > > >
>>> > > > > 查询SQL:
>>> > > > >
>>> > > > > INSERT INTO
>>> > > > > flink_recent_pv_subid
>>> > > > > SELECT
>>> > > > > `sid`,
>>> > > > > `subid`,
>>> > > > > `mark`,
>>> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>>> > > > 'MMddHHmm') as `time`,
>>> > > > > count(1) AS `pv`
>>> > > > > FROM baidu_log_view
>>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>>> > > MINUTE);
>>> > > > >
>>> > > > >
>>> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
>>> > > > >
>>> > > > >> @hailongwang 一样的。
>>> > > > >>
>>> > > > >> 有个情况说明下,我是tumble
>>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>>> > > > >> 

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 文章 Yun Tang
Hi

集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大 
taskmanager.network.request-backoff.max [1][2] 以增大重试次数

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max
[2] https://juejin.cn/post/6844904185347964942#heading-8


祝好
唐云

From: 赵一旦 
Sent: Monday, November 23, 2020 13:08
To: user-zh@flink.apache.org 
Subject: Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。

zhisheng  于2020年11月18日周三 下午10:51写道:

> 是不是有 kafka 机器挂了?
>
> Best
> zhisheng
>
> hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
>
> > 感觉还有其它 root cause,可以看下还有其它日志不?
> >
> >
> > Best,
> > Hailong
> >
> > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > >2020-11-18 16:51:37
> > >org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > >Partition
> > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > >not found.
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > >)
> > >at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > >.java:670)
> > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > >CompletableFuture.java:646)
> > >at java.util.concurrent.CompletableFuture$Completion.run(
> > >CompletableFuture.java:456)
> > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > >ForkJoinExecutorConfigurator.scala:44)
> > >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > >.java:1339)
> > >at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >at
> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > >.java:107)
> > >
> > >
> > >请问这是什么问题呢?
> >
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
总结下:
(1)group
by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
(2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。

如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。


说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
甚至DDL中推荐可以搞个自定义on
duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
update功能。




赵一旦  于2020年11月23日周一 下午4:48写道:

> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> 发现这种方式也不行,但是加了group by之后是可以的。
>
> (1)
> 所以说是否还需要query带有key的语义才行呢?
> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>
> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>
> (2)如JarkWu所说,是mysql表的DDL部分决定。
>
> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>
> Jark Wu  于2020年11月23日周一 下午4:28写道:
>
>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>
>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>
>> Best,
>> Jark
>>
>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>>
>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>> > 页面。
>> >
>> >
>> >
>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>> >
>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
>> >
>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>> > >
>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
>> > >
>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>> > > > >
>> > > >
>> > > > Flink uses the primary key that defined in DDL when writing data to
>> > > > external databases. The connector operate in upsert mode if the
>> primary
>> > > key
>> > > > was defined, otherwise, the connector operate in append mode.
>> > > >
>> > > > In upsert mode, Flink will insert a new row or update the existing
>> row
>> > > > according to the primary key, Flink can ensure the idempotence in
>> this
>> > > way.
>> > > > To guarantee the output result is as expected, it’s recommended to
>> > define
>> > > > primary key for the table and make sure the primary key is one of
>> the
>> > > > unique key sets or primary key of the underlying database table. In
>> > > append
>> > > > mode, Flink will interpret all records as INSERT messages, the
>> INSERT
>> > > > operation may fail if a primary key or unique constraint violation
>> > > happens
>> > > > in the underlying database.
>> > > >
>> > > > See CREATE TABLE DDL
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>> > > > >
>> > > > for
>> > > > more details about PRIMARY KEY syntax.
>> > > >
>> > > >
>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>> > > messages,
>> > > > the INSERT operation may fail if a primary key or unique constraint
>> > > > violation happens in the underlying database.  什么叫append
>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>> > > >
>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>> > > >
>> > > >
>> > > >
>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
>> > > >
>> > > > > 补充sql:
>> > > > >
>> > > > > DDL:
>> > > > >
>> > > > > CREATE TABLE flink_recent_pv_subid
>> > > > > (
>> > > > > `supply_id` STRING,
>> > > > > `subid` STRING,
>> > > > > `mark`  STRING,
>> > > > > `time`  STRING,
>> > > > > `pv`BIGINT,
>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
>> > > > > ) WITH (
>> > > > >   'connector.type' = 'jdbc',
>> > > > >
>> > > > >   ..
>> > > > >
>> > > > > );
>> > > > >
>> > > > >
>> > > > > 查询SQL:
>> > > > >
>> > > > > INSERT INTO
>> > > > > flink_recent_pv_subid
>> > > > > SELECT
>> > > > > `sid`,
>> > > > > `subid`,
>> > > > > `mark`,
>> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>> > > > 'MMddHHmm') as `time`,
>> > > > > count(1) AS `pv`
>> > > > > FROM baidu_log_view
>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>> > > MINUTE);
>> > > > >
>> > > > >
>> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
>> > > > >
>> > > > >> @hailongwang 一样的。
>> > > > >>
>> > > > >> 有个情况说明下,我是tumble
>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
>> > > > >>
>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
>> > > > >>>
>> > > > >>>
>> > > > >>> Best,
>> > > > >>> Hailong
>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
>> key的时候,会使用UpsertTableSink,并且会使用insert
>> > 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
发现这种方式也不行,但是加了group by之后是可以的。

(1)
所以说是否还需要query带有key的语义才行呢?
比如group by的结果是可能update的,并且基于group by key也指出了key。

那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?

(2)如JarkWu所说,是mysql表的DDL部分决定。

如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?

Jark Wu  于2020年11月23日周一 下午4:28写道:

> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>
> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>
> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > 页面。
> >
> >
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >
> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> >
> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >
> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> > >
> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > > >
> > > >
> > > > Flink uses the primary key that defined in DDL when writing data to
> > > > external databases. The connector operate in upsert mode if the
> primary
> > > key
> > > > was defined, otherwise, the connector operate in append mode.
> > > >
> > > > In upsert mode, Flink will insert a new row or update the existing
> row
> > > > according to the primary key, Flink can ensure the idempotence in
> this
> > > way.
> > > > To guarantee the output result is as expected, it’s recommended to
> > define
> > > > primary key for the table and make sure the primary key is one of the
> > > > unique key sets or primary key of the underlying database table. In
> > > append
> > > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > > operation may fail if a primary key or unique constraint violation
> > > happens
> > > > in the underlying database.
> > > >
> > > > See CREATE TABLE DDL
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > > >
> > > > for
> > > > more details about PRIMARY KEY syntax.
> > > >
> > > >
> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > > messages,
> > > > the INSERT operation may fail if a primary key or unique constraint
> > > > violation happens in the underlying database.  什么叫append
> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > > >
> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > > >
> > > >
> > > >
> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > > >
> > > > > 补充sql:
> > > > >
> > > > > DDL:
> > > > >
> > > > > CREATE TABLE flink_recent_pv_subid
> > > > > (
> > > > > `supply_id` STRING,
> > > > > `subid` STRING,
> > > > > `mark`  STRING,
> > > > > `time`  STRING,
> > > > > `pv`BIGINT,
> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > > ) WITH (
> > > > >   'connector.type' = 'jdbc',
> > > > >
> > > > >   ..
> > > > >
> > > > > );
> > > > >
> > > > >
> > > > > 查询SQL:
> > > > >
> > > > > INSERT INTO
> > > > > flink_recent_pv_subid
> > > > > SELECT
> > > > > `sid`,
> > > > > `subid`,
> > > > > `mark`,
> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > > 'MMddHHmm') as `time`,
> > > > > count(1) AS `pv`
> > > > > FROM baidu_log_view
> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > > MINUTE);
> > > > >
> > > > >
> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > > > >
> > > > >> @hailongwang 一样的。
> > > > >>
> > > > >> 有个情况说明下,我是tumble
> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > > > >>
> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > > >>>
> > > > >>>
> > > > >>> Best,
> > > > >>> Hailong
> > > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > > > >>> >如题,按照官方文档,当mysql表定义了primary
> key的时候,会使用UpsertTableSink,并且会使用insert
> > on
> > > > >>> >duplicate方式写入。
> > > > >>> >
> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
> 'uniq_ssmt'
> > > > >>> >at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > >>> Method)
> > > > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > > >>> >NativeConstructorAccessorImpl.java:62)
> > > > >>> >at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > > >>> >at
> > > 

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

"select  appname" +

",productCode" +

",link" +

 ",count(case when nodeName = 'FailTerminateEndEvent' 
then 1 else null end) as errNum" +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Accept' then 1 else null end ) as passNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Reject' then 1 else null end) as refNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
<> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

",sum(case when nodeName = 'EndEvent' then loansum else 
0 end ) as loansum" +

",count(1) as allNum " +

",'OneHour' as windowType " +

",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' 
HOUR) as inputtime " +

"from  table1_2 WHERE link in ('1','2','5')  GROUP BY 
HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
Table tablesql = 
tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream> dataStream2 = 
tableEnv.toRetractStream(tablesql,Row.class);
DataStream> dataStream7Day = 
tableEnv.toRetractStream(table7Day,Row.class);


//将Table转成dataStream
DataStream reslut1 = dataStream2.map(new 
MapFunction, String>() {
@Override
public String map(Tuple2 tuple2) throws Exception {
Map json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));
 reslut2.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
dpzhoufeng...@163.com
|
签名由网易邮箱大师定制

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

"select  appname" +

",productCode" +

",link" +

 ",count(case when nodeName = 'FailTerminateEndEvent' 
then 1 else null end) as errNum" +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Accept' then 1 else null end ) as passNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Reject' then 1 else null end) as refNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
<> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

",sum(case when nodeName = 'EndEvent' then loansum else 
0 end ) as loansum" +

",count(1) as allNum " +

",'OneHour' as windowType " +

",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' 
HOUR) as inputtime " +

"from  table1_2 WHERE link in ('1','2','5')  GROUP BY 
HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
Table tablesql = 
tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream> dataStream2 = 
tableEnv.toRetractStream(tablesql,Row.class);
DataStream> dataStream7Day = 
tableEnv.toRetractStream(table7Day,Row.class);


//将Table转成dataStream
DataStream reslut1 = dataStream2.map(new 
MapFunction, String>() {
@Override
public String map(Tuple2 tuple2) throws Exception {
Map json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));
 reslut2.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
dpzhoufeng...@163.com
|
签名由网易邮箱大师定制

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。

新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。

Best,
Jark

On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:

> 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> 页面。
>
>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>
> Jark Wu  于2020年11月23日周一 下午3:32写道:
>
> > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >
> > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >
> > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > >
> > >
> > > Flink uses the primary key that defined in DDL when writing data to
> > > external databases. The connector operate in upsert mode if the primary
> > key
> > > was defined, otherwise, the connector operate in append mode.
> > >
> > > In upsert mode, Flink will insert a new row or update the existing row
> > > according to the primary key, Flink can ensure the idempotence in this
> > way.
> > > To guarantee the output result is as expected, it’s recommended to
> define
> > > primary key for the table and make sure the primary key is one of the
> > > unique key sets or primary key of the underlying database table. In
> > append
> > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > operation may fail if a primary key or unique constraint violation
> > happens
> > > in the underlying database.
> > >
> > > See CREATE TABLE DDL
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > >
> > > for
> > > more details about PRIMARY KEY syntax.
> > >
> > >
> > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > messages,
> > > the INSERT operation may fail if a primary key or unique constraint
> > > violation happens in the underlying database.  什么叫append
> > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >
> > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > >
> > > > 补充sql:
> > > >
> > > > DDL:
> > > >
> > > > CREATE TABLE flink_recent_pv_subid
> > > > (
> > > > `supply_id` STRING,
> > > > `subid` STRING,
> > > > `mark`  STRING,
> > > > `time`  STRING,
> > > > `pv`BIGINT,
> > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > ) WITH (
> > > >   'connector.type' = 'jdbc',
> > > >
> > > >   ..
> > > >
> > > > );
> > > >
> > > >
> > > > 查询SQL:
> > > >
> > > > INSERT INTO
> > > > flink_recent_pv_subid
> > > > SELECT
> > > > `sid`,
> > > > `subid`,
> > > > `mark`,
> > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > 'MMddHHmm') as `time`,
> > > > count(1) AS `pv`
> > > > FROM baidu_log_view
> > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > MINUTE);
> > > >
> > > >
> > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > > >
> > > >> @hailongwang 一样的。
> > > >>
> > > >> 有个情况说明下,我是tumble
> window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > > >>
> > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Hailong
> > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert
> on
> > > >>> >duplicate方式写入。
> > > >>> >
> > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> > > >>> >at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > >>> Method)
> > > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > >>> >NativeConstructorAccessorImpl.java:62)
> > > >>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > >>> >at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > >>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > > >>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > > >>> >at
> > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > > >>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > > >>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > > >>> >at
> > 

pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 文章 fengmuqi...@ruqimobility.com
hi.
pyflink 1.11.1 调用包含第三方依赖库的udf时报错 :

运行环境:
windows10 
python==3.7.9
apache-flink==1.11.1
apache-beam==2.19.0

udf 依赖第三方库:
h3==3.7.0

pytest 通过。

运行时报错,报错信息如下
2020-11-23 14:20:51,656 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, order_info]], fields=[source, order_id, user_id, car_id, 
driver_name, driver_id, time_dispatch_done, time_start, time_cancel, status, 
start_city, end_city, start_ad_code, end_ad_code, cancel_reason_id, 
realStartLatitude, realStartLongitude]) -> StreamExecPythonCalc -> Sink: 
Sink(table=[default_catalog.default_database.print_table], fields=[hash_h3]) 
(3/8) (056a0a0cdf3838794f4023e61d04a690) switched from RUNNING to FAILED.
java.lang.RuntimeException: Failed to create stage bundle factory! 
   at 
org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:320)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at