兄弟,感谢
在 2020-07-08 11:04:40,"夏帅" 写道:
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long):
ProducerRecord[Array[Byte], Array[Byte]] = {
new
Hi,
我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?
另外,我理解下你的需求是 db1.test 同步到 db2.test, db1.status 同步到 db2.status?
多表的*有序*同步是指?
我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
into 到 db2.status mysql table就行了。
感谢反馈使用体验。
Best,
Jark
On Wed, 8 Jul 2020 at
Hello,
我理解下你场景:d1的 test 表 和 status 表两者之间有关联,比如外键,比如 test 更新一条数据后 status也需要级联地更新一条数据。
希望通过 Flink 的CDC功能同步这两张表到db2后,任意时刻,这两张表的状态是原子的(两张表对应 d1中两张表的一个快照版本), 是这种场景吗?
如果是这种场景,现在是还没有支持的。
Best,
Leonard Xu
> 在 2020年7月8日,11:59,Jark Wu 写道:
>
> Hi,
>
> 我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的
hi, noake
感谢分享。我加了这个依赖后也OK了。周知下大家。
在 2020-07-07 22:15:05,"noake" 写道:
>我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
>dependency
> groupIdorg.apache.flink/groupId
> artifactIdflink-clients_${scala.binary.version}/artifactId
> version${flink.version}/version
>/dependency
>
>
>原始邮件
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}
Best,
Yangze Guo
On Wed, Jul 8, 2020 at 11:27 AM SmileSmile wrote:
>
>
> hi
>
> 作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
>
> Exception in thread "main" java.lang.IllegalStateException: No
> ExecutorFactory found to
你好:
问题1,指定shuffle_mode
tEnv.getConfig.getConfiguration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
"pipeline")
问题2,mode是UNDEFINED的概念
使用UNDEFINED并不是说模式没有定义,而是由框架自己决定
The shuffle mode is undefined. It leaves it up to the framework to decide the
shuffle mode.
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的
void sqlUpdate(String stmt);
--原始邮件--
发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一
hi Jark Wu.
??table.exec.source.idle-timeoutwatermarkwatermarkwatermarkwatermark??
Jark??flink??
----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
On Tue, 7 Jul 2020 at 17:35, noake
估计是这个导致的:
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090
On Wed, 8 Jul 2020 at 09:21, sunfulin wrote:
> hi, noake
> 感谢分享。我加了这个依赖后也OK了。周知下大家。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在
场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
例如mysql实例db1中有表test, statusCREATE TABLE `test` ( `id` int(11) NOT NULL,
`name` varchar(255) NOT NULL, `time` datetime NOT NULL, `status` int(11)
NOT NULL,
ValueState[Cache]??value
map??cacheputupdatestate??cache??1
Hi,
感谢您的指导!
祝好!
Leonard Xu 于2020年7月7日周二 下午9:49写道:
> Hi,
>
> 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
> type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
>
> 祝好,
> Leonard Xu
> [1]https://issues.apache.org/jira/browse/FLINK-16622 <
>
Hi?? ??Flink
1.10.0??jobmanager??libflink-shaded-hadoop-2-uber-2.7.5-10.0.jar??webuicli??Could
not find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink
我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?
18579099...@163.com
只要你的hive目标表创建为Parquet格式就行了哈,INSERT语句上跟其他类型的表没有区别的
On Tue, Jul 7, 2020 at 10:05 AM lydata wrote:
> Hi,
>
> 可以提供一份flink1.10 入hive格式为parquet的例子吗?
>
> Best,
> lydata
--
Best regards!
Rui Li
添加依赖后正常了。应该是这个导致的
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090
thanks
| |
a511955993
|
|
邮箱:a511955...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月08日 11:30,Yangze Guo 写道:
尝试加一下这个依赖
hi all,
flink升级到1.11,flink-connector-jdbc
idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
hi
作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory
found to execute the application.
at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
at
Hi,
夏帅的方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1],
这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。
祝好,
Leonard Xu
[1]
https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable
Hello,
我看下了maven仓库里有的[1], 官网文档里也有下载链接[2],是不是pom里的依赖没有写对?1.11 jdbc connector 的module名从
flink-jdbc 规范到了 flink-connector-jdbc。
祝好,
Leonard Xu
[1]
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/
补充: 1.11的shuffle-mode配置的默认值为ALL_EDGES_BLOCKING
共有
ALL_EDGES_BLOCKING(等同于batch)
FORWARD_EDGES_PIPELINEDPOINTWISE_EDGES_PIPELINED
ALL_EDGES_PIPELINED(等同于pipelined)对于pipelined多出了两种选择
--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二)
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long):
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic,
如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 确实解决不了这个问题。
目前确实没有太好的解决办法。
Best,
Jark
On Wed, 8 Jul 2020 at 11:08, 1193216154 <1193216...@qq.com> wrote:
> hi Jark Wu.
>
> 我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
>
hi
按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file
system
/docker-entrypoint.sh: 72:
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。
求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢? 需要写再底层点的api吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hello,
很有意思的话题,我理解这需要保证多个CDC数据源 的全局一致性, 多个业务表的 bin-log 通过 cdc接入flink后,得保证
每个数据源的写入目标库的时候有一个全局一致性的保证,这个底层的APi应该也支持不了的。
一种可能的思路是 抽取cdc 记录 的metadata里的 committed ts (原始数据库中每次变更的时间, debezuim
的source.ts_ms字段, canal的es 字段),通过这个时间来协调 多个 CDC 数据源的处理速度,这只是我的一个想法。
不过可以确定的是,目前的API应该拿不到这个信息,现在的 Flink
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink
1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink
cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql
嗯, 可以在 JIRA 中开个 issue 描述下你的需求~
On Wed, 8 Jul 2020 at 12:01, 1193216154 <1193216...@qq.com> wrote:
> Jark,flink有没有必要去支持这个特性?我感觉还是有一些应用场景
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年7月8日(星期三) 中午11:48
> 收件人:"user-zh"
> 主题:Re: 如何在Flink SQL中使用周期性水印?
>
>
>
> 如果所有
Hi,
问一下,你是指用1.10去恢复 1.9 作业的 savepoint/checkpoint 吗?还是指迁移到 1.10 后,无法从 failover
中恢复?
如果是前者的话,Flink SQL 目前没有保证跨大版本的 state 兼容性。所以当你从 1.9 升级到 1.10 时,作业需要放弃状态重跑。
Best,
Jark
On Tue, 7 Jul 2020 at 15:54, 吴磊 wrote:
> 各位好:
> 当我把作业从flink1.9.0迁移到1.10.1,且作业中使用了'group by'形式的语法时,会导致无法从cp/sp恢复,
> 代码:
>
>
Hi,
你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
Best,
Jark
On Tue, 7 Jul 2020 at 15:31, Jun Zhang wrote:
> hi.sunfulin
> 你有没有导入blink的planner呢,加入这个试试
>
>
> org.apache.flink
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
>
>
>
> sunfulin 于2020年7月7日周二 下午3:21写道:
>
>>
>>
----
??:"Leonard
hi, jark
我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
configuration里的DeployOptions.TARGET (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
//构建StreamExecutionEnvironment
public static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
hi??
flink sql ??kafka??key
kafka connectorkey??
hi
是的,想以下面这种方式获取
CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)
On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu wrote:
> Hi,
> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话,
好的
On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu wrote:
> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限 写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type'
Hi
你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug --enable-fill
make
其次最好指定dump文件的输出地址,例如在 MALLOC_CONF中加上前缀的配置 prof_prefix:/tmp/jeprof.out
,以确保文件位置可写。
最后,由于你是在容器中跑,在容器退出前要保证相关文件能上传或者退出时候hang住一段时间,否则相关dump的文件无法看到了
祝好
唐云
flink1.9.0??1.10.1'group
by'??cp/sp??
??
??
switched from RUNNING to FAILED.switched from RUNNING to
FAILED.java.lang.Exception: Exception while creating
StreamOperatorStateContext. at
hi、
flink table/sql api中,有办法获取kafka元数据吗?
tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))
Hi,
目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。
祝好,
Leonard Xu
[1]
嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
Best,
Leonard Xu
> 在 2020年7月7日,17:26,Dream-底限 写道:
>
> hi
> 是的,想以下面这种方式获取
>
> CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
>
>
> On Tue, Jul 7, 2020
Dear All:
大佬们, 请教下如何在Flink SQL中使用周期性的水印。
我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。
hi.sunfulin
你有没有导入blink的planner呢,加入这个试试
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
sunfulin 于2020年7月7日周二 下午3:21写道:
>
>
>
> hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
>
source是kafka,有一个rowtime定义:
.field("rowtime", DataTypes.TIMESTAMP(0))
.rowtime(Rowtime()
.timestamps_from_field("actionTime")
.watermarks_periodic_bounded(6)
)
有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble
Hi,
kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
祝好,
Leonard Xu
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:
Row(parsedResponse: BasicArrayTypeInfo, timestamp: Long)
执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
to [Ljava.lang.String;
at
watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?
比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?
Best,
Jark
On Tue, 7 Jul 2020 at 17:20, lgs <9925...@qq.com> wrote:
> source是kafka,有一个rowtime定义:
>
> .field("rowtime", DataTypes.TIMESTAMP(0))
> .rowtime(Rowtime()
>
hi,
我的pom文件本地执行时,scope的provided都是去掉的。
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
这个异常在啥情况下会触发到。
在
hi yun tang!
下午通过配置yaml的方式修改env成功生成内存文件,目前在重新复现和获取文件ing! tanks!具体内存dump在获取ing
| |
a511955993
|
|
邮箱:a511955...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月07日 17:47,Yun Tang 写道:
Hi
你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug
是1个小时才到来。10:00- 11:00的数据,11:01分到来。
但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi, Leonard Xu:
我使用的 sql 如下,
> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit
从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志
INFO - Initializing heap
如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
再试试看?
Best,
Jark
On Tue, 7 Jul 2020 at 18:01, sunfulin wrote:
> hi,
> @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>
> @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07
Hi,
这个问题我理解其实和周期性水印没有关系,是属于 idle source
的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]
Best,
Jark
[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
On Tue, 7 Jul 2020 at 17:35, noake wrote:
> Dear All:
>
>
>
好的,感谢。
在2020年7月7日 10:28,Paul Lam 写道:
估计你是用同一个 Kafka Source 消费 A B 两个 Topic? 如果是,看起来像是 Kafka Connector 早期的一个问题。
作业停止的时候,Topic B 的 partition offset 被存储到 Savepoint 中,然后在恢复的时候,尽管代码中 Topic B
已经被移除,但它的 partition offset 还是被恢复了。
这个问题在后来的版本,估计是 1.8 或 1.9,被修复了。
Best,
Paul Lam
2020年7月6日
Hi,
可以描述下你的业务场景么? 为什么一定要去获取 key 的信息呢,因为按照我的理解,一般来说 key 的信息一般在 value 中也有。
Best,
Jark
On Tue, 7 Jul 2020 at 17:17, op <520075...@qq.com> wrote:
> 感谢
>
>
>
>
> --原始邮件--
> 发件人:"Leonard Xu" 发送时间:2020年7月7日(星期二) 下午5:15
> 收件人:"user-zh"
> 主题:Re: flink sql
hi,
@Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
@Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
在 2020-07-07 15:40:17,"Jark Wu" 写道:
>Hi,
>
>你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 15:31, Jun Zhang wrote:
>
>> hi.sunfulin
>>
Hi
从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的
resources 文件下是否有对应的 resource 文件
Best,
Congxian
sunfulin 于2020年7月7日周二 下午6:29写道:
>
>
>
> hi,
> 我的pom文件本地执行时,scope的provided都是去掉的。
>
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
>
Hi,
看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
祝好,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16622
我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
dependency
groupIdorg.apache.flink/groupId
artifactIdflink-clients_${scala.binary.version}/artifactId
version${flink.version}/version
/dependency
原始邮件
发件人:Congxian qiuqcx978132...@gmail.com
收件人:user-zhuser...@flink.apache.org
抄送:Jark wuimj...@gmail.com; Jun
DataStream??apiUV??2
1Tumbling??1Time.days(1)??uv
trigger
hi all
本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("STRING")
)
public class Split extends TableFunction {
public Split(){}
public void eval(String str, String
DataStream??apiUV??2
1Tumbling??1Time.days(1)??uv
trigger
好的
| |
邹云鹤
|
|
邮箱:kevinyu...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月07日 23:27,Benchao Li 写道:
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。
[1] https://issues.apache.org/jira/browse/FLINK-18520
邹云鹤 于2020年7月7日周二 下午9:43写道:
>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF
??shuffle_mode???
pipeline.
??datastream??keyby??mode??UNDEFINED???
.
----
??:"Jingsong Li"
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。
[1] https://issues.apache.org/jira/browse/FLINK-18520
邹云鹤 于2020年7月7日周二 下午9:43写道:
>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
>
>
> @FunctionHint(
> input = {@DataTypeHint("STRING"),
65 matches
Mail list logo