Thanks Vino and Rafi for your references.
Regarding push gateway recommendations for batch - I am following this
reference (https://prometheus.io/docs/practices/pushing/).
The scenario that I have is that we start Flink Apps on EMR whenever we need
them. Sometimes the task manager gets killed
Hi ,
I have the following pipeline :
1. single hour window that counts the number of records
2. single day window that accepts the aggregated data from #1 and emits the
highest hour count of that day
3. union #1 + #2
4. Logic operator that accepts the data from #3 and keep a listState of #2
and
Hi,
I'm trying to use Kafka as an event store and I want to create several
partitions to improve read/write throughput. Occasionally I need to rewind
offset to a previous position for recomputing. Since order isn't guaranteed
among partitions in Kafka, does this mean that Flink won't produce the
Hi community,when I write the flink ddl sql like this:
CREATE TABLE kafka_src (
id varchar,
a varchar,
b TIMESTAMP,
c TIMESTAMP
)
with (
...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);
If the
not sure that I can see how it is simpler. #2 is time window per day it
emits the highest hour for that day. #4 is not a time window it keeps
history of several days . if I want to put the logic of #2 I will need to
manage it with timers, correct ?
On Thu, Dec 26, 2019 at 6:28 AM Kurt Young
Hi Joe,
Your requirement is the effective exactly-once for external sink. I think your
option 4 with TwoPhaseCommitSinkFunction is the right way to go.
Unfortunately I am not quite familiar with this part, so can not give you
specific suggestions for using it, especially for your concern of
Hi,
You can merge the logic of #2 into #4, it will be much simpler.
Best,
Kurt
On Wed, Dec 25, 2019 at 7:36 PM Avi Levi wrote:
> Hi ,
>
> I have the following pipeline :
> 1. single hour window that counts the number of records
> 2. single day window that accepts the aggregated data from #1
Lets say you keep your #1, which does hourly counting, and emit result to
the merged
new #2. The new #2 would first keep all hourly result in state, and also
keep tracking
whether it already receive all 24 results belong to same day. Once you
received all 24
count belong to the same day, you can
If I understood correctly, different partitions of Kafka would be emitted by
different source tasks with different watermark progress. And the Flink
framework would align the different watermarks to only output the smallest
watermark among them, so the events from slow partitions would not be
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:
```java
// Original: working for Flink 1.7
//@Override
public int[]
Hi LakeShen,
I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-15396
On Thu,
Hi,
Take a look here: https://github.com/eastcirclek/flink-service-discovery
I used it successfully quite a while ago, so things might have changed
since.
Thanks,
Rafi
On Wed, Dec 25, 2019, 05:54 vino yang wrote:
> Hi Mans,
>
> IMO, the mechanism of metrics reporter does not depend on any
Hi,
Greetings, hope this is the proper place to ask questions, apologize if not.
We have a shared flink cluster running with docker, want to set different
heap size per task(some tasks require larger heap size, while most tasks
only need a little), is it feasible?
I've gone through [1], [2] and
Hi Li,
It is true that currently all the task managers have the same heap size,
and it's fixed ever since started. Unfortunately, your needs cannot be
satisfied at the moment.
Heap size of task managers cannot be changed once started, because flink
task managers run in JVMs and JVM does not
Understood, thank you for the quick response!
Thanks,
Li
Xintong Song 于2019年12月25日周三 下午5:05写道:
> Hi Li,
>
> It is true that currently all the task managers have the same heap size,
> and it's fixed ever since started. Unfortunately, your needs cannot be
> satisfied at the moment.
>
> Heap size
Hi Ruibin,
Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]
Best,
Vino
[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
邢瑞斌
Happy Holidays everyone!
tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.
I currently store movie rental information in a DynamoDB table named
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。
Xintong Song 于2019年12月25日周三 上午10:28写道:
> 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:
>
> >
Hi,lucas.wu:
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理
lucas.wu 于2019年12月25日周三 下午5:13写道:
> hi all:
> flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Hi,
I'm trying to use Kafka as an event store and I want to create several
partitions to improve read/write throughput. Occasionally I need to rewind
offset to a previous position for recomputing. Since order isn't guaranteed
among partitions in Kafka, does this mean that Flink won't produce the
Hi community,when I write the flink ddl sql like this:
CREATE TABLE kafka_src (
id varchar,
a varchar,
b TIMESTAMP,
c TIMESTAMP
)
with (
...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);
If the
slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
profile"就能够看到slot的资源需求。
Thank you~
Xintong Song
[1]
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
memory为2g,也就是一个slot平均200m,所以任务没调度起来。
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个
Hi faaron zheng,
如kurt所说,强烈建议使用1.10,现在已拉分支。
TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。
Best,
Jingsong Lee
--
From:Kurt Young
Send Time:2019年12月26日(星期四) 14:07
To:user-zh
Subject:Re:
是否可以尝试使用幂等来解决 端到端的一致性
Best wishes,
沈磊
卢伟楠 于2019年12月25日周三 下午4:09写道:
> 各位大佬好:
>
> 最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
> 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException:
> Communications link failure during commit(). Transaction resolution unknown.
>
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
memory为2g,也就是一个slot平均200m,所以任务没调度起来。
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
If I understood correctly, different partitions of Kafka would be emitted by
different source tasks with different watermark progress. And the Flink
framework would align the different watermarks to only output the smallest
watermark among them, so the events from slow partitions would not be
hi all:
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。
Best,
Kurt
On Thu, Dec 26, 2019 at 1:36 PM Xintong Song wrote:
> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考
Hi LakeShen,
I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-15396
On Thu,
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:
结果界面如下:
hdfs如下:
麻烦大神们给与指导
部门 / 数据平台
花名 / 起子
Mobile :159 8810 1848
WeChat :159 8810 1848
Email :q...@dian.so
Address :浙江省杭州市余杭区文一西路998号5#705
各位大佬好:
最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException:
Communications link failure during commit(). Transaction resolution unknown.
2:org.apache.flink.streaming.runtime.tasks.TimerException:
hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Hi Ruibin,
Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]
Best,
Vino
[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
邢瑞斌
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?
xin Destiny 于2019年12月25日 周三18:13写道:
> Hi,lucas.wu:
>
> 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
> 不过这样state会占用很大的内存,需要主意state的清理
>
> lucas.wu 于2019年12月25日周三 下午5:13写道:
>
> > hi all:
> >
可以使用guava实现维表数据缓存在jvm,可以设置缓存数据有效期
| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制
在2019年12月26日 08:44,李现 写道:
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?
xin Destiny 于2019年12月25日 周三18:13写道:
Hi,lucas.wu:
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
Hi 李现
现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗?
原始邮件
发件人:李现stormallin2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月26日(周四) 08:44
主题:Re: flink 维表关联
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny
nj18652727...@gmail.com于2019年12月25日 周三18:13写道:
flink-conf.yaml里需要有这些配置
historyserver.web.port: 8082
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.refresh-interval: 1
historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
38 matches
Mail list logo