现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
谢谢!!
Hi, LakeShen
图太大了,回复的邮件没有投递成功,图加到附件了。
补充一下任务信息,目前此任务使用的是Flink-1.5。
LakeShen 于2020年4月13日周一 上午10:15写道:
> Hi Junzhong ,
>
> 图片没有显示,能否把图片重新上传一下。
>
> Best,
> LakeShen
>
> Junzhong Qin 于2020年4月11日周六 上午10:38写道:
>
> > 在跑Flink任务时,遇到了operator反压问题,任务执行图如下,source(读Kafka),
> >
>
hi
你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?
Best,
Congxian
half coke 于2020年4月15日周三 下午12:24写道:
> 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
>
谢谢
| |
王志华
|
|
邮箱:a15733178...@163.com
|
签名由 网易邮箱大师 定制
在2020年04月15日 11:44,zhang...@lakala.com 写道:
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g
发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题:
请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
Hi,
我觉得你的需求是“使用系统时间关联维表变更日志”。
这种方式可以保证最低的延迟,而且能保持高吞吐。
不过这个功能目前还没有原生支持,Flink 1.11 会支持读取变更日志。但关联维表变更日志可能要等到1.12。
当前,可以通过 temporal table function join [1] 来满足需求,就是需要一定的开发量。需要你自己去将 mysql
binlog 数据(只能有 upsert 数据,不能有 delete)读进来构造成 Table。
Best,
Jark
[1]:
参考下这篇文章,里面有好多维度关联场景案例讲解!
https://ververica.cn/developers/flink-datastream-associated-dimension-table-practice/
发件人: tingli ke
发送时间: 2020-04-15 11:22
收件人: user-zh
主题: Re: JDBCLookupFunction被缓存导致数据的不及时性
是否有其他的方式来对mysql维表数据进行实时join
13122260...@163.com <13122260...@163.com> 于2020年4月15日周三
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g
发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题: 自定义具有Exactly-Once语义的sink
有两个参数可以控制cache大小和cache失效时间 [1],你可以在性能和准确性上做权衡
-- lookup options, optional, used in temporary join
'connector.lookup.cache.max-rows' = '5000', -- optional, max number
of rows of lookup cache, over this value, the oldest rows will
-- be eliminated.
可以考虑调小cache.ttl
On Wed, Apr 15, 2020 at 11:22 AM tingli ke wrote:
> 是否有其他的方式来对mysql维表数据进行实时join
>
>
> 13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
>
> > 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> > org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> > The
FlinkKafkaProducer、StreamingFileSink的实现都支持Exactly-Once,可以研究下
Best Regards
jinhai...@gmail.com
> 2020年4月15日 上午11:00,阿华田 写道:
>
> 如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
Flink FlinkKafkaProducer??
----
??:"??"
是否有其他的方式来对mysql维表数据进行实时join
13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人: tingli ke
您好,不使用cache会导致每个记录都要查一次mysql,效率很低效
13122260...@163.com <13122260...@163.com> 于2020年4月15日周三 上午11:08写道:
> 有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
> org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
> The cacheMaxSize is -1 means not use cache
>
>
>
> 13122260...@163.com
>
> 发件人:
有个setCacheMaxSize(1000),可以改成 -1 表示不使用cache
org.apache.flink.api.java.io.jdbc.JDBCLookupFunction 这个方法有解释
The cacheMaxSize is -1 means not use cache
13122260...@163.com
发件人: tingli ke
发送时间: 2020-04-15 10:55
收件人: user-zh
主题: JDBCLookupFunction被缓存导致数据的不及时性
Hi,
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
Hi,
流表通过JDBCLookupFunction来对mysql的维表数据进行实时join,但是JDBCLookupFunction会对数据进行缓存,导致mysql的维表数据被更新,但是flink还是老的数据,(考虑性能需要被缓存)
是否有其他的方式来对mysql维表数据进行实时join
使用的是perjob模式提交作业,没有使用yarn-seesion。为什么perjob模式提交有这个-yd参数会有问题,还是没太懂。
在 2020-04-15 08:52:11,"tison" 写道:
>-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
>
>with -yd 以 perjob 模式提交作业,即启动一个新集群
>without -yd 提交到一个现有的 Flink on YARN 集群
>
>哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
>
>Best,
>tison.
>
>
>guanyq
-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
with -yd 以 perjob 模式提交作业,即启动一个新集群
without -yd 提交到一个现有的 Flink on YARN 集群
哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
Best,
tison.
guanyq 于2020年4月15日周三 上午8:46写道:
> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
> At 2020-04-14 15:31:00, "guanyq"
提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
At 2020-04-14 15:31:00, "guanyq" wrote:
>提交失败,yarn资源也还有很多,为什么会提交失败呢?
>
>提交脚本
>./bin/flink run -m yarn-cluster \
>-ynm TestDataProcess \
>-yd \
>-yn 2 \
>-ytm 1024 \
>-yjm 1024 \
>-c com.data.processing.unconditionalacceptance.TestDataProcess \
Hi,
是的,现在是不支持,老的sink没有使用这个primary key来做upsert,但是在1.11里新的sink接口会打通DDL的primary
key的。[1]
[1]https://issues.apache.org/jira/browse/FLINK-17030
Best,
Jingsong Lee
On Tue, Apr 14, 2020 at 5:38 PM 叶贤勋 wrote:
> Hi all:
> 我看源码在将sqlNode转换CreateTableOperator[1]时,还是不支持primary key配置,但是sql
>
Hi all:
我看源码在将sqlNode转换CreateTableOperator[1]时,还是不支持primary key配置,但是sql
parser是已经能够解析,请问下为何不放开这个限制。
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java#L178
| |
叶贤勋
|
|
PyFlink目前只支持Python Table
API,rabbitmq目前还没有提供Table/SQL的connector,如果想在PyFlink里使用rabbitmq,有以下几种解决方案:
1)实现Java的rabbitmq的TableSource/TableSink,可以参考Kafka等connector的实现,基本只需要在现有实现的基础上包装一下即可。
2)在PyFlink作业里使用rabbitmq的source/sink。目前在PyFlink里注册TableSource/TableSink有2种方式:
昨天测试了下,除了需要添加 flink-connector-kafka_2.11-1.10.0.jar
这个外,还需要flink-connector-kafka-base_2.11-1.10.0.jar,感觉Flink在添加依赖jar做的不是很好,添加也不够灵活!
发件人: zhisheng
发送时间: 2020-04-14 15:24
收件人: user-zh
主题: Re: Re: Flink
1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
Hi,
你的UDF应该要显示指定一下参数的类型,覆盖ScalarFunction的getParameterTypes方法。
因为UDF对于复杂类型的推导能力有限,这种复杂类型可以显示指定参数类型。
出发 <573693...@qq.com> 于2020年4月14日周二 下午3:37写道:
> 1.定义ddl解析array字段时候,假如select
> 那个字段可以解析出。2.当我去定义自己函数时候,会出现null,flink直接跳过解析array那个函数了吗?
> CREATE TABLE sourceTable (
> event_time_line array
您好~
我是一个flink的初学者。目前的需求是从rabbitmq读入流处理后存进mysql。
因为公司大家的技术栈是python,最近看到了中国社区在推广pyflink(apache-flink)所以就试了一下。但是我发现连接rabbitmq的时候,java有很多包支持,比如
org.apache.flink.streaming.connectors.rabbitmq.*
,想问一下pyflink有类似的连接支持么?
或者有什么推荐的解决方案么比如用pika连接等
多谢~
--
Ella Sun
1.ddlarray??select
2.??null??flinkarray??
CREATE TABLE sourceTable (
event_time_line array
这个问题定义了
我用了两个kafka包,其中红色的包是不需要的,非常感谢你们的帮助
flink-connector-kafka_2.11-1.10.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
发件人: 秦寒
发送时间: 2020年4月10日 10:15
收件人: 'Hequn Cheng' ; 'user-zh'
主题: 回复: 关于kafka connector通过python链接
这个搞定了,pip3重装了一下apache flink,引入了jar包搞定
发件人: 秦寒 <
这个搞定了,pip3重装了一下apache flink,引入了jar包搞定
发件人: 秦寒
发送时间: 2020年4月9日 16:41
收件人: 'Hequn Cheng' ; 'user-zh'
主题: 回复: 关于kafka connector通过python链接
您好
根据你们的说明我做了如下配置,我用的是flink 1.10版本
1在pyflink/lib下面添加了kafka-clients-2.2.0.jar
2
谢谢
-- 原始邮件 --
发件人: Benchao Li
提交失败,yarn资源也还有很多,为什么会提交失败呢?
提交脚本
./bin/flink run -m yarn-cluster \
-ynm TestDataProcess \
-yd \
-yn 2 \
-ytm 1024 \
-yjm 1024 \
-c com.data.processing.unconditionalacceptance.TestDataProcess \
./tasks/UnconditionalAcceptanceDataProcess.jar \
yarn资源
Apps Submitted Apps PendingApps Running
可以试试设置 -ytm 2048m 看看是不是还这样
wangweigu...@stevegame.cn 于2020年4月14日周二
下午2:16写道:
>
> 应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
> yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
> 容器内存增量: yarn.scheduler.increment-allocation-mb
>
> 发件人: guanyq
> 发送时间: 2020-04-14 14:05
> 收件人: user-zh
>
应该加了 flink-connector-kafka_2.11-1.10.0.jar 这个就行
wangweigu...@stevegame.cn 于2020年4月13日周一
下午3:09写道:
>
> 感谢flink道友解答,谢谢!
>
>
> 目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
>
??flink1.9.1hdfs??keberos
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/abc.keytab
security.kerberos.login.principal: abc/ad...@test.com
2020-04-14 11:14:20,650 INFO
Hi,Congxian:
不好意思,邮件消失在了邮件海中...
我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB
StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。
public class App {
public static void main(String[] args)
应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
容器内存增量: yarn.scheduler.increment-allocation-mb
发件人: guanyq
发送时间: 2020-04-14 14:05
收件人: user-zh
主题: Re:Re: 关于flink 提交job参数不生效的问题
./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
脚本设置-ytm 666但是flink ui页面的,job manager--taskmanager.heap.size为1024
在 2020-04-14 14:10:31,"Xintong Song" 写道:
>启动命令看起来是对的。
>你说的不起作用,具体是什么现象呢?
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Apr 14, 2020 at 2:05 PM guanyq wrote:
>
>> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
>>
启动命令看起来是对的。
你说的不起作用,具体是什么现象呢?
Thank you~
Xintong Song
On Tue, Apr 14, 2020 at 2:05 PM guanyq wrote:
> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
> \-yjm 666 \-c
> com.data.processing.unconditionalacceptance.TestDataProcess
>
./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
\-yjm 666 \-c
com.data.processing.unconditionalacceptance.TestDataProcess
\./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id Test001
\--checkpoint.interval 5000
在 2020-04-14 14:00:59,"Xintong Song" 写道:
>你邮件里的图片没有显示出来。
你邮件里的图片没有显示出来。
建议把完整的启动命令贴一下。
Thank you~
Xintong Song
On Tue, Apr 14, 2020 at 1:11 PM guanyq wrote:
> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
>
>
>
>
40 matches
Mail list logo