Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi zilong 之前没有使用 `-t max` 跑过,你可以分享一下你使用的全部命令么?我可以本地看看 Best, Congxian zilong xiao 于2020年7月14日周二 上午10:16写道: > `-t max`之后出现的~ 改小并发后貌似没问题 > > Congxian Qiu 于2020年7月13日周一 下午8:14写道: > > > Hi > > > > 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢? > > > > Best, > > Congxian > > > > > > zilong xiao

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Congxian Qiu
Hi 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢? 另外你可以看下 tm log 看看有没有其他异常 Best, Congxian Yun Tang 于2020年7月14日周二 上午11:57写道: > Hi Peihui > > 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root > cause。 > > [1] >

Re: 退订

2020-07-13 文章 Congxian Qiu
Hi 退订需要发送邮件到 user-zh-unsubscr...@flink.apache.org Best, Congxian 成欢晴 于2020年7月14日周二 下午12:44写道: > 退订 > > > | | > chq19970719 > | > | > 邮箱:chq19970...@163.com > | > > Signature is customized by Netease Mail Master

Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 Congxian Qiu
Hi 如果可以的话,建议先调用 RestClient 的 stop 等命令(这样可以在最后做一次 savepoint,或者 checkpoint -- 这个 FLINK-12619 想做),然后失败再使用 yarn 的 kill 命令,这样能够减少后续启动时的回放数据量 Best, Congxian zhisheng 于2020年7月14日周二 下午12:53写道: > 如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业 > > Jeff Zhang 于2020年7月11日周六 下午11:23写道: > > > Zeppelin

Re: State 0点清除的问题

2020-07-13 文章 Congxian Qiu
Hi 如果你需要精确的控制每天 0 点清除 state 的话,或许你可以考虑使用 processFunction[1], 然后自己使用 timer 实现相关逻辑 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/process_function.html Best, Congxian ゞ野蠻遊戲χ 于2020年7月14日周二 下午1:10写道: > 大家好: >

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
没有使用窗口呢,就多表关联,涉及到流表join流表,流表join维表,group by 、topN等 -- Sent from: http://apache-flink.147419.n8.nabble.com/

State 0????????????

2020-07-13 文章 ?g???U?[????
ProcessAllWindowFunction0??state?? Thanks

Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 zhisheng
如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业 Jeff Zhang 于2020年7月11日周六 下午11:23写道: > Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient > api来做到的,对zeppelin感兴趣的话,可以参考这个视频 > > https://www.bilibili.com/video/BV1Te411W73b?p=21 > > > jianxu 于2020年7月11日周六 下午4:52写道: > > > Hi: > > > > >

退订

2020-07-13 文章 成欢晴
退订 | | chq19970719 | | 邮箱:chq19970...@163.com | Signature is customized by Netease Mail Master

Re: (无主题)

2020-07-13 文章 Jingsong Li
Hi 退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org Best Jingsong On Tue, Jul 14, 2020 at 12:36 PM 成欢晴 wrote: > 退订 > > > | | > chq19970719 > | > | > 邮箱:chq19970...@163.com > | > > Signature is customized by Netease Mail Master -- Best, Jingsong Lee

(无主题)

2020-07-13 文章 成欢晴
退订 | | chq19970719 | | 邮箱:chq19970...@163.com | Signature is customized by Netease Mail Master

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 zhisheng
有没有窗口啊? Robin Zhang 于2020年7月14日周二 上午11:48写道: > > 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by 和多次 流表的关联 。 > 代码如下: >tEnv.getConfig() > .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), > >

Re: Re: flink on yarn日志问题

2020-07-13 文章 zhisheng
知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看 Yangze Guo 于2020年7月14日周二 上午11:58写道: > Hi, 王松 > > 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。 > > Best, > Yangze Guo > > On Tue, Jul 14, 2020 at 8:26 AM 王松 wrote: > > > > 我们也有问题 1,和 Yangze Guo

退出邮件组

2020-07-13 文章 成欢晴
| | chq19970719 | | 邮箱:chq19970...@163.com | Signature is customized by Netease Mail Master

回复:flink1.9状态及作业迁移

2020-07-13 文章 成欢晴
退订 | | chq19970719 | | 邮箱:chq19970...@163.com | Signature is customized by Netease Mail Master 在2020年07月14日 12:15,Yun Tang 写道: 对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint

Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint [1],而存储Checkpoint的代码可以参照Checkpoints#storeCheckpointMetadata [2] [1]

Re: Re: flink on yarn日志问题

2020-07-13 文章 Yangze Guo
Hi, 王松 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。 Best, Yangze Guo On Tue, Jul 14, 2020 at 8:26 AM 王松 wrote: > > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。 > > Yangze Guo 于2020年7月13日周一 下午5:03写道: > > > 1. > >

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Yun Tang
Hi Peihui 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root cause。 [1]

Re: flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi、 请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗 》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。 Yun Tang 于2020年7月14日周二 上午11:54写道: > Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。 > > Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。 > > >

Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。 Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。 [1] https://issues.apache.org/jira/browse/FLINK-5763 祝好 唐云 From: Dream-底限 Sent: Tuesday, July 14, 2020 11:07 To:

Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by 和多次 流表的关联 。 代码如下: tEnv.getConfig() .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),

回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi 那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。 可爱的木兰 发件人: Benchao Li 发送时间: 2020年7月14日 11:00 收件人: user-zh 主题: Re: Flink SQL处理Array型的JSON 我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了, 我建了一个issue[1]. [1]

回复: Flink SQL复杂JSON解析

2020-07-13 文章 hua mulan
Hi 那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。 可爱的木兰 发件人: Benchao Li 发送时间: 2020年7月8日 20:46 收件人: user-zh 主题: Re: Flink SQL复杂JSON解析 看代码应该是支持复合类型的,你可以试下。 hua mulan 于2020年7月8日周三 下午8:34写道: > 我试了下 Array里是基本元素可以CROSS

flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi: flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?

flink cep 如何处理超时事件?

2020-07-13 文章 drewfranklin
Hello all. 想请教下各位。 我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。 但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。 想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。

Re:回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi, 感谢社区热心答疑! 在 2020-07-14 11:00:18,"夏帅" 写道: >你好, >本质还是StreamingFileSink,所以目前只能append > > >-- >发件人:Zhou Zach >发送时间:2020年7月14日(星期二) 10:56 >收件人:user-zh >主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录 > > > > >Hi

回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 夏帅
你好, 本质还是StreamingFileSink,所以目前只能append -- 发件人:Zhou Zach 发送时间:2020年7月14日(星期二) 10:56 收件人:user-zh 主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录 Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming

Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Benchao Li
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了, 我建了一个issue[1]. [1] https://issues.apache.org/jira/browse/FLINK-18590 Leonard Xu 于2020年7月14日周二 上午10:42写道: > Hello,可爱的木兰 > > 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] > > SELECT users, tag > FROM Orders CROSS JOIN UNNEST(tags) AS

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 在 2020-07-14 09:56:00,"Leonard Xu" 写道: >Hi, > >> 在 2020年7月14日,09:52,Zhou Zach 写道: >> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6),

回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hello,Leonard Xu 我这边JSON 不是 { "id": 2, "heap": [ { "foo": 14, "bar": "foo" }, { "foo": 16, "bar": "bar" } ], } 而是直接一个Array [ { "foo": 14, "bar": "foo" },

Re: 【Flink Join内存问题】

2020-07-13 文章 admin
regular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储 > 2020年7月13日 下午10:30,忝忝向仧 <153488...@qq.com> 写道: > > Hi: > > > interval join可以缓解key值过多问题么? > interval join不也是计算某段时间范围内的join么,跟regular join相比,如何做到避免某个stream的key过多问题? > 谢谢. > > > > > --原始邮件-- > 发件人:

自定义的sql connector在sql-cli中运行问题

2020-07-13 文章 admin
hi all, 我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下 2020-07-14 10:36:29,148 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException:

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
好吧,感谢回答 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
hello, 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 Caused by: java.nio.file.NoSuchFileException:

Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Leonard Xu
Hello,可爱的木兰 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1] SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。 我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。 等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素; 这样kafka的batch

Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi Kafka中的JSON结构是个Array例子如下。 [ { "id": 1}, { "id": 2} ] 读出来变成表的两行。Flink SQL层面最佳实践是什么? 如果没有办法是不是只能改JSON结构了。 可爱的木兰

flink1.9.1-消费kafka落pg库任务出错

2020-07-13 文章 nicygan
dear all: 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。 详细如下: 1、部署方式: flink on yarn ,pre-job,每个container 1024 M jobmanager的jvmoption(默认的) -Xms424m-Xmx424m 2、数据情况: kafka数据,约1分钟1条,文本数据,每条数据都非常小。 3、任务情况:

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
这可能是 connect API 的某个 bug 吧。 建议先用 DDL 。 Best, Jark On Tue, 14 Jul 2020 at 08:54, Hito Zhu wrote: > rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。 > Rowtime rowtime = new Rowtime() > .timestampsFromField("searchTime") > .watermarksPeriodicBounded(5 *

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
补充一下,kubernetes版本是1.18 Yvette zhai 于2020年7月13日周一 下午9:10写道: > 1. 执行的脚本,产生的日志是: > 2020-07-13 21:00:25,248 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.address, localhost > 2020-07-13 21:00:25,251 INFO >

Re: flink-benchmarks使用求助

2020-07-13 文章 zilong xiao
`-t max`之后出现的~ 改小并发后貌似没问题 Congxian Qiu 于2020年7月13日周一 下午8:14写道: > Hi > > 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢? > > Best, > Congxian > > > zilong xiao 于2020年7月13日周一 下午2:32写道: > > > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown > > timeout of 30 seconds

Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi, > 在 2020年7月14日,09:52,Zhou Zach 写道: > >>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), >>> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, 看下这个抽取出来的rowkey是否有重复的呢? 祝好, Leonard Xu

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi, Leonard 我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序, 再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer, 再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪 在 2020-07-13

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。 Rowtime rowtime = new Rowtime() .timestampsFromField("searchTime") .watermarksPeriodicBounded(5 * 1000); -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: flink on yarn日志问题

2020-07-13 文章 王松
我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。 Yangze Guo 于2020年7月13日周一 下午5:03写道: > 1. > 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志 > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job > > [1] >

?????? ??Flink Join??????????

2020-07-13 文章 ????????
Hi: interval joinkey? interval join??join??regular join??stream??key? . ---- ??:

Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi, Zhou > 'connector.write.buffer-flush.max-size' = '1mb', > 'connector.write.buffer-flush.interval' = ‘0s' (1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator

flink state

2020-07-13 文章 Robert.Zhang
Hello,all 目前stream中遇到一个问题, 想使用一个全局的state 在所有的keyed stream中使用,或者global parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽 Best regards

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 godfrey he
1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 和

回复: 使用Flink Array Field Type

2020-07-13 文章 叶贤勋
谢谢 Leonard的解答。刚刚也看到了这个jira单[1] [1] https://issues.apache.org/jira/browse/FLINK-17847 | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制 在2020年07月13日 20:50,Leonard Xu 写道: Hi, SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where CARDINALITY(arr) >= 5 这种方式防止数组访问越界。 祝好, Leonard Xu 在

Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??TableResult

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
你的源码中 new Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime); 里面的 rowtime 的定义能贴下吗? On Mon, 13 Jul 2020 at 20:53, Hito Zhu wrote: > Hi Jark 异常信息如下: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field null does not exist > at > >

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
1. 执行的脚本,产生的日志是: 2020-07-13 21:00:25,248 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2020-07-13 21:00:25,251 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration

flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( |rowkey VARCHAR, |cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |)

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi,知道了 source.execute_insert("g_source_tab”) 返回的结果是一个TableResult对象,如果不显示地等待任务的执行,这个任务会直接返回,你试下这个 result.execute_insert("g_source_tab") \ .get_job_client() \ .get_job_execution_result() \ .result() 这是Flip-84引入的一个改动,为了更好地处理table程序的返回值。 祝好, Leonard Xu > 在 2020年7月13日,20:57,小学生

Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??1.10

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是 flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成 flink-sql-connector-kafka 后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下: org.apache.flink flink-table-planner-blink_${scala.binary.version}

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
Hi Jark 异常信息如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: Field null does not exist at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85) at

回复: flink 1.11运算结果存mysql出错

2020-07-13 文章 Zhonghan Tang
有新数据进来吗,看起来和这个jira很像 https://issues.apache.org/jira/browse/FLINK-15262 在2020年07月13日 20:38,Leonard Xu 写道: Hi, 简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点 Best, Leonard Xu 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:

Re: 使用Flink Array Field Type

2020-07-13 文章 Leonard Xu
Hi, SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where CARDINALITY(arr) >= 5 这种方式防止数组访问越界。 祝好, Leonard Xu > 在 2020年7月13日,20:34,叶贤勋 写道: > > test_array_string[0]

Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
topic??,??flink1.10??insert_into

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao 祝好 > 在 2020年7月13日,20:11,Yvette zhai 写道: > > 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap > "flink-config-k8s-session-1" not found > > > Leonard Xu 于2020年7月13日周一 下午8:03写道: > >> Hi, zhai >> >>

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi, 简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点 Best, Leonard Xu > 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道: > > 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python > *.py执行的。完整代码如下 > > > from

使用Flink Array Field Type

2020-07-13 文章 叶贤勋
Flink 1.10.0 问题描述:source表中有个test_array_string ARRAY字段,在DML语句用test_array_string[0]获取数组中的值会报数组越界异常。另外测试过Array也是相同错误,Array,Array等类型也会报数组越界问题。 请问这是Flink1.10的bug吗? SQL: CREATETABLE source ( …… test_array_string ARRAY ) WITH ( 'connector.type'='kafka', 'update-mode'='append', 'format.type'='json'

Re:Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
好的,感谢答疑 在 2020-07-13 19:49:10,"Jingsong Li" 写道: >创建kafka_table需要在default dialect下。 > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach wrote: > >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK

Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢? Best, Congxian zilong xiao 于2020年7月13日周一 下午2:32写道: > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗? > > Congxian Qiu 于2020年7月10日周五 下午7:18写道:

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-k8s-session-1" not found Leonard Xu 于2020年7月13日周一 下午8:03写道: > Hi, zhai > > 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。 > > Best, > Leonard Xu > > > 在 2020年7月13日,19:59,Yvette zhai 写道: > > > >

Re: Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-13 文章 Congxian Qiu
Hi 程龙 如果可以的话,也麻烦使用 1.11.0 测试下看问题是否还存在。 Best, Congxian 程龙 <13162790...@163.com> 于2020年7月13日周一 上午10:45写道: > > > > > > > 问题不是很常见 ,但是同一个任务,提交在flink1.10 和 flink1.10.1上都会复现, 准备尝试一下升级一下jdk试试 > > > > > > > > > > > > 在 2020-07-06 16:11:17,"Congxian Qiu" 写道: > >@chenkaibit 多谢你的回复~ > > > >Best, >

flink 1.11??????????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysqllinuxpython *.py from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。 Best, Leonard Xu > 在 2020年7月13日,19:59,Yvette zhai 写道: > > 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。 > 下载的flink-1.11.0-bin-scala_2.11.tgz > > 执行命令是 > ./bin/kubernetes-session.sh \ > -Dkubernetes.cluster-id=k8s-session-1 \ >

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Congxian Qiu
Hi Zhefu 感谢你在邮件列表分享你的解决方法,这样其他人遇到类似问题也有一个参考。 Best, Congxian Zhefu PENG 于2020年7月13日周一 下午7:51写道: > Hi all, > > 这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。 > > >

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Leonard Xu
> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。 > > Best, > Zhefu 谢谢 zhefu, 给你大大点赞,很社区的方式,相信这样的积累越多,小伙伴们都能学习到更多。 祝好, Leonard Xu > > LakeShen 于2020年6月12日周五 上午9:49写道: > >> Hi ZheFu, >> >> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink >> 的数据是否都已经 Sink 到了 kafka. >>

flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。 下载的flink-1.11.0-bin-scala_2.11.tgz 执行命令是 ./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=k8s-session-1 \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \

Re: 滑动窗口数据存储多份问题

2020-07-13 文章 Congxian Qiu
Hi 从 HeapListState#add 这里看是的,我跟了一个 WindowOperator 到最终 HeapListState 的逻辑,这里确实是只有一份数据,没有拷贝。这个东西的实现可能是因为性能好,我尝试确认下这个原因,多谢你的提问。 Best, Congxian Jimmy Zhang <13669299...@163.com> 于2020年7月12日周日 上午8:13写道: > Hi,all! > >

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Zhefu PENG
Hi all, 这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。 问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。 Best, Zhefu LakeShen 于2020年6月12日周五 上午9:49写道: > Hi ZheFu,

Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
创建kafka_table需要在default dialect下。 不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) Best, Jingsong On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach wrote: > 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, > 如果是default Dialect创建的表,是不是只是在临时会话有效 > > > > > > > > > > > >

Re:Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, 如果是default Dialect创建的表,是不是只是在临时会话有效 在 2020-07-13 19:27:44,"Jingsong Li" 写道: >Hi, > >问题一: > >只要current catalog是HiveCatalog。 >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. >

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 Jark Wu
你可以在 mysqlSinkFunction 中攒 buffer,在 timer trigger 或者 checkpoint 时 flush mysql database,以及 output。 On Mon, 13 Jul 2020 at 15:36, jindy_liu <286729...@qq.com> wrote: > > > 如果可以chain在一起,这个可以保证顺序性,我去试试。 > > 这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1; > >

Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
Hi, 问题一: 只要current catalog是HiveCatalog。 理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. 明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? 问题二: 用filesystem创建出来的是filesystem的表,它和hive metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 filesystem的表数据是直接写到

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
能贴下完整的异常栈么? Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用 executeSql(ddl) 来替代。 社区计划在 1.12 中系统地重构和修复 connect API 。 Best, Jark On Mon, 13 Jul 2020 at 17:24, Hito Zhu wrote: > 使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 > createTemporaryTable > 为事件时间,程序包 Field

Re: flink 1.11写入mysql问题

2020-07-13 文章 Jark Wu
请问你是怎么提交的作业呢? 是在本地 IDEA 里面执行的,还是打成 jar 包后提交到集群运行的呢? On Mon, 13 Jul 2020 at 17:58, 小学霸 wrote: > 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。 > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic, CheckpointingMode > from pyflink.table import

Re:Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
尴尬 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! 还有两个问题问下, 问题1: 创建的kafka_table,在hive和Flink SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore 问题2:

回复: Flink es7 connector认证问题

2020-07-13 文章 李宇彬
感谢,已找到问题原因,这个provider变量应该放到setHttpClientConfigCallback内部,之前是作为私有成员变量transient声明的,会导致taskmanager无法拿到认证信息 String user = pt.get("es.user.name"); String password = pt.get("es.user.password"); esSinkBuilder.setRestClientFactory( (RestClientBuilder restClientBuilder) ->

Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 Leonard Xu
HI,fulin 如 Yangze所说,这是es6 new connector 引入的一个bug, 你可以使用用old connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。 祝好, Leonard Xu > 在 2020年7月13日,17:19,Yangze Guo 写道: > > 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。 > > [1] >

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql?? from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE

回复:退订

2020-07-13 文章 苑士旸
谢谢,已经找到 | | yuanshiyang | | 邮箱yuanshiy...@163.com | 签名由 网易邮箱大师 定制 在2020年07月13日 17:55,Jingsong Li 写道: Hi 退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org Best Jingsong On Mon, Jul 13, 2020 at 5:53 PM 苑士旸 wrote: > > > > | | > yuanshiyang > | > | > 邮箱yuanshiy...@163.com > | > > 签名由 网易邮箱大师 定制

Re: pyflink问题求助

2020-07-13 文章 Xingbo Huang
Hi hieule, This work around method is used in flink 1.10, in flink 1.11 you can use ddl directly (blink planner) which you can refer to [1]. For how to use blink planner in PyFlink, you can refer to following code: t_env = BatchTableEnvironment.create(

Re: 退订

2020-07-13 文章 Jingsong Li
Hi 退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org Best Jingsong On Mon, Jul 13, 2020 at 5:53 PM 苑士旸 wrote: > > > > | | > yuanshiyang > | > | > 邮箱yuanshiy...@163.com > | > > 签名由 网易邮箱大师 定制 -- Best, Jingsong Lee

Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
你把完整的程序再贴下呢 Best, Jingsong On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach wrote: > Hi, > > > 我现在改成了: > 'sink.partition-commit.delay'='0s' > > > checkpoint完成了20多次,hdfs文件也产生了20多个, > hive表还是查不到数据 > > > > > > > > > > > > > > 在 2020-07-13 17:23:34,"夏帅" 写道: > > 你好, > 你设置了1个小时的 >

退订

2020-07-13 文章 苑士旸
| | yuanshiyang | | 邮箱yuanshiy...@163.com | 签名由 网易邮箱大师 定制

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql?? from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE

Re:回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
Hi, 我现在改成了: 'sink.partition-commit.delay'='0s' checkpoint完成了20多次,hdfs文件也产生了20多个, hive表还是查不到数据 在 2020-07-13 17:23:34,"夏帅" 写道: 你好, 你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY -- 发件人:Zhou Zach 发送时间:2020年7月13日(星期一) 17:09

flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable 为事件时间,程序包 Field null does not exist 错误,是我用法有问题? 看了下 https://issues.apache.org/jira/browse/FLINK-16160 这个 issue 是解决的这个问题吗? tableEnv.connect(kafka) .withSchema(

回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 夏帅
你好, 你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY -- 发件人:Zhou Zach 发送时间:2020年7月13日(星期一) 17:09 收件人:user-zh 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector. 开了checkpoint, val

flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable 为事件时间,程序包 Field null does not exist 错误,是我用法有问题? 看了下 https://issues.apache.org/jira/browse/FLINK-16160 这个 issue 是解决的这个问题吗? tableEnv.connect(kafka) .withSchema(

Re: Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
有没有设置 sink.partition-commit.delay? Best, Jingsong On Mon, Jul 13, 2020 at 5:09 PM Zhou Zach wrote: > 开了checkpoint, > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >

Re: Re: Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 Yangze Guo
验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。 [1] https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285

pyflink connect mysql

2020-07-13 文章 hieule
Hi , I has problem when submit job ``` java.lang.AbstractMethodError: Method org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.consumeDataSet(Lorg/apache/flink/api/java/DataSet;)Lorg/apache/flink/api/java/operators/DataSink; is abstract ``` My code : ``` import logging import os import

Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
开了checkpoint, val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)

Re: Re: flink on yarn日志问题

2020-07-13 文章 Yangze Guo
1. 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html Best, Yangze Guo On Mon,

  1   2   >