Hi,
Flink有类似storm主动fail的机制吗?
没有的话,有什么好的实现方案吗?比如用状态存储失败的记录?
感谢您的回复
| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|
Signature is customized by Netease Mail Master
Hi, 大家好,
Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。
欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!
http://apacheflink.mikecrm.com/wDivVQ1
也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。
Best,
Jark
Hi,大家好!
遇到一个问题,在使用flink run
提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
Best
Aven
应该只能改ContinuousFileMonitoringFunction源码 , 支持多path
王智 于2020年3月4日周三 下午6:34写道:
> 我的需求是2,现在我使用的是execEnv.createInput(inputFormat()),
>
> 我先去试试 env.addSource(new InputFormatSourceFunction(..)...)。
>
> 多谢~
>
>
>
>
>
>
>
>
> 原始邮件
>
>
> 发件人:"JingsongLee"< lzljs3620...@aliyun.com.INVALID ;
>
>
我试了下,是可以的。
Thanks
wangl...@geekplus.com.cn
Sender: Kurt Young
Send Time: 2020-03-11 19:59
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看
Best,
Kurt
On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn
wrote:
两个从 kafka 创建的表:
tableA: key valueA
tableB: key valueB
用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?
比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
谢谢,
王磊
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
两个从 kafka 创建的表:
tableA: key valueA
tableB: key
Hi Aven,
静态字段的使用可能会很 tricky,因为只有同一个 task 的代码才运行在同一个 classloader 里。我见过想用静态字段做全局 Map
存储的,那个实际上只有并行度设置为 1 的时候语义才对。
你说启动的生命周期执行一些用户代码,那其实就是 RichFunction 的 open
方法,它就是设计来做这个的。具体可以看你的实际业务,未必要搞得这么奇怪(x
Best,
tison.
aven.wu 于2020年3月12日周四 上午10:54写道:
> Hello
>
>
Hi Benchao,
Great feedbacks!
1) 全量初始化能力:
第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。
但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。
2) 自动生成watermark:
这也是 roadmap 上的一个工作。
3) binlog以state的形式存储,只需全量加载,后续只接受增量:
我理解这个和 (1) 是类似的需求。Flink SQL 对接之后
Benchao is right.
嵌套字段是无法直接访问的,需要逐级引用到。
On Thu, 12 Mar 2020 at 00:45, Benchao Li wrote:
> Hi 周炎,
>
> 你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
> request.`value`.`time`*来使用它们。
>
> 周炎 于2020年3月11日周三 下午5:42写道:
>
> > DDL语句如下:
> > CREATE TABLE ods_usage_naga_dsp_filter (
> >
Hello
还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。
Best
Aven
发件人: zhisheng
发送时间: 2020年3月11日 21:16
收件人: user-zh
主题: Re: 关于Flink 命令行参数广播的问题
Hi Flavio,
We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not
DDL语句如下:
CREATE TABLE ods_usage_naga_dsp_filter (
request row<`value` row> >>>,
event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
)WITH (
'connector.type' = 'kafka',
'format.fail-on-missing-field'='false',
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-11-133919.png
我看现在还不支持 per job 模式,哎
zhisheng 于2020年3月11日周三 下午9:31写道:
> 好的,我先去 look look,感谢
>
> Kurt Young 于2020年3月11日周三 下午9:30写道:
>
>> https://github.com/ververica/flink-sql-gateway 了解一下
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11,
hi,aven.wu
可以使用 ParameterTool
获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);
在算子中可以在 open 方法里面通过
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置
aven.wu 于2020年3月11日周三 下午3:42写道:
> Hi,大家好!
> 遇到一个问题,在使用flink run
>
好的,我先去 look look,感谢
Kurt Young 于2020年3月11日周三 下午9:30写道:
> https://github.com/ververica/flink-sql-gateway 了解一下
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote:
>
> > hi, Kurt Young
> >
> > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> >
https://github.com/ververica/flink-sql-gateway 了解一下
Best,
Kurt
On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote:
> hi, Kurt Young
>
> 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> sql-client
>
> Kurt Young 于2020年3月11日周三 下午7:59写道:
>
> > 那有可能是可以的,你可以试试看
> >
> >
hi, Kurt Young
除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
sql-client
Kurt Young 于2020年3月11日周三 下午7:59写道:
> 那有可能是可以的,你可以试试看
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > Hi Kurt,
> >
> >
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong
field name in the DDL.
It should be "input_date", not "intput_date".
Best,
Jark
On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
>
> Sorry i sent the Chinese written
Hi Kurt,
如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
存储并且再次提交任务可以被访问到直接用吗?
谢谢,
王磊
wangl...@geekplus.com.cn
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc:
那有可能是可以的,你可以试试看
Best,
Kurt
On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
> Hi Kurt,
>
> 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> 中恢复的功能吗?
> 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
>
Sorry i sent the Chinese written email to user@
Let me translate it to English.
I create a table using sql-client from kafka topic:
CREATE TABLE order_status (
out_order_code VARCHAR,
intput_date TIMESTAMP(3),
owner_code VARCHAR,
status INT
) WITH (
Thanks Jark,
No word to express my '囧'.
wangl...@geekplus.com.cn
Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you
Hi 周炎,
你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
request.`value`.`time`*来使用它们。
周炎 于2020年3月11日周三 下午5:42写道:
> DDL语句如下:
> CREATE TABLE ods_usage_naga_dsp_filter (
> request row<`value` row varchar,reqid varchar,source varchar,`time` varchar,`filter`
> array> >>>,
>
Hi,
感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。
问卷已填,再此再提几个小想法:
1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。
2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append
log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。
3.
25 matches
Mail list logo