hello, Yang Wang
这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?
Best,
zhisheng
Yang Wang 于2020年9月3日周四 上午11:15写道:
> 目前HA模式下,application模式还不能支持多job,所以就固定是0了
> 主要的原因是recover的问题还没有解决好
>
>
> Best,
> Yang
>
> chenkaibit 于2020年9月2日周三 下午7:29写道:
>
> > hi:
> > 我在测试 flink-1.11 application 模式时发现 开启 HA 后
hi
把 -d 参加加上用分离方式启动 应该就可以了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
it seems that YARN has a feature for targeting specific hardware:
https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
In any case, you'll need enough spare resources for some time to be able to
run your job twice for this kind of "zero downtime
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。
我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启)
如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。
在 2020-11-13 09:13:34,"史 正超" 写道:
>这是个思路,谢谢回复,我先试下。
>
>发件人:
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件
newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't
able to confirm that your message came from a trusted location.
For Email
不知道为什么我现在发往社区的邮件,老是会提示这种东西,原来社区都没有收到我的邮件
newxmmxszc44.qq.com rejected your message to the following email addresses:
m...@zhangzuofeng.cn
Your message couldn't be delivered because the recipient's email system wasn't
able to confirm that your message came from a trusted location.
For Email
hi
从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached
的值是什么吗? 再有启动任务的时候是否加了 -d 参数
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_audio_lyric_task',
Thank you.. I looked into that, but that does not initialize any values in
keyed state, instead, it using key state, and lines 407-412 show that is
not setting key state values in advanced, handling null values when it is
not set in advance.
public void processElement(String value, Context ctx,
Hi, Macro
I think you could look at testScalingUp() at
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Best,
Guowei
On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos
wrote:
> Hi,
>
> I would
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的
execute 应该也是会自动的去拉结果的。
可以看下下列关键日志的打印情况
- log.info("Job {} reached globally terminal state {}.", ...)
- LOG.debug("Shutting down cluster because someone retrieved the job
result.");
- LOG.info("Shutting {} down with
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job
Manager -> logs查看失败日志内容)
best,
amenhub
发件人: JasonLee
发送时间: 2020-11-13
hi
1.确定提交的是Per-job模式,提交命令是./bin/flink run -m yarn-cluster xxx,并且从Flink web ui中的Job
Manager -> Configuration观察到execution.target值为yarn-per-job
2.整体任务状态为Failed,但是TM挂了,JM没有挂(没有挂的原因猜测是因为Yarn application还在Running的原因吧?所以还能从Job
Manager -> logs查看失败日志内容)
best,
amenhub
发件人: JasonLee
发送时间: 2020-11-13
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这个问题我也遇到过。
1.11版本,提交任务如果没加-d参数,flink程序挂掉了,但是yarn的application还一直是running状态,就相当于一个常驻的yarn
session。
加上-d的话才能把flink程序和yarn application的生命周期捆绑到一起。
--
kingdomad
在 2020-11-13 11:16:02,"amen...@163.com" 写道:
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
>>>当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
按照这个说法,应当是偶发性行为,然而我一直等待Flink上报,大概几个小时过去了Yarn状态仍然处于Running..
>>>你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
这个话没有看懂,我的提交方式是./bin/flink run -m yarn-cluster xxx,Flink版本是1.11.1
昨天在社区邮件里发现了Flink-1.10以前可以通过-d参数解决Per-job模式下Flink web
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。
当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。
你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?
Best,
tison.
zhisheng 于2020年11月12日周四 下午8:17写道:
> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态
Lei Wang 于2020年11月12日周四 下午9:17写道:
> 用 session windown 确实能满足功能:
>
>
> robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
> y) -> y);
>
> 按照这种写法, 我理解 window state 中只保存了最近的一条记录。
>
>
> 正常情况下 robot
Hi,
I would like to adding keyed state to test harness before calling process
function.
I am using the OneInputStreamOperatorTestHarness.
I can't find any examples online on how to do that, and I am struggling to
figure this out.
Can somebody please provide guidance? My test case has keyed
这是个思路,谢谢回复,我先试下。
发件人: 赵一旦
发送时间: 2020年11月13日 2:05
收件人: user-zh@flink.apache.org
主题: Re: flink-1.11.2 执行checkpoint失败
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
史 正超 于2020年11月13日周五
如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
史 正超 于2020年11月13日周五 上午10:01写道:
> 从上面看是的。
>
> public void handleJobLevelCheckpointException(CheckpointException
> exception, long checkpointId) {
>
从上面看是的。
public void handleJobLevelCheckpointException(CheckpointException exception,
long checkpointId) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
failureCallback.failJob(new
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
史 正超 于2020年11月12日周四 下午9:23写道:
> 执行checkpoint失败,报下面的错。
> 2020-11-12 21:04:56
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
>
??1.11.2??debug??
??
----
??:
sql简化后类似这样, 做checkpoint超时
CREATE VIEW fcbox_send_fat_view AS
SELECT
REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
disCode,
IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
dislv,
IF(express_company_id IS NOT NULL, express_company_id, 'NA') AS
expressCompanyId,
没有,用的是jdbc sink,先是 三张change log的 left join,然后
再分别与两张mysql维表的join,执行checkpoint超时。sql太复杂了,我越写越没信心。。。
```
CREATE VIEW fcbox_send_fat_view AS
SELECT
REPLACE(SUBSTR(client_post_time, 1, 10), '-', '') AS staDate,
disCode,
IF(dis_name IS NOT NULL, dis_name, 'NA') AS disName,
dislv,
首先,我想知道你是否使用了kafka sink?
在 2020年11月12日 21:16,史 正超 写道:
执行checkpoint失败,报下面的错。 2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold. at
I have some updates. Some weird behaviours were found. Please refer to the
attached photo.
All requests were sent via REST API
The status of the savepoint triggered by that stop request (ID 11018) is
"COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in
S3).
The folder
The test workflow attachment is not added in the previous email, sorry
for the confusion, please refer to the describe text workflow.. Thanks.
On 11/12/20 16:17, fuyao...@oracle.com wrote:
Hi All,
Just to add a little more context to the problem. I have a full outer
join operation before
Hi All,
Just to add a little more context to the problem. I have a full outer
join operation before this stage. The source data stream for full outer
join is a Kafka Source. I also added timestamp and watermarks to the
FlinkKafkaConsumer. After that, it makes no difference to the result,
I'm now trying with a MATCH_RECOGNIZE:
SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
LAST(B.client_number) as client_number,
LAST(B.address) as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
)
Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job listener is
not fired at all if I submit the JobGraph of the application to a cluster
using the RestClusterClient..
Am I doing something wrong?
My main class ends
Hi Aljoscha,
You're right, I had a misunderstanding about how unions without window
operations work.
Thanks!
On Thu, Nov 12, 2020, 18:37 Aljoscha Krettek wrote:
> Hi,
>
> I think if you don't do any operations that are sensitive to event-time
> then just using a UNION/UNION ALL should work
Hi Aljoscha,
Yes correct i would like to have more windows when there are more events for a
given time frame. That is when
the events are more dense in time. I can calculate the time difference between
each event and create a parameter that can create windows of different sizes
dynamically
Awesome, thanks! Is there a way to make the new yarn job only on the new
hardware? Or would the two jobs have to run on intersecting hardware and
then would be switched on/off, which means we'll need a buffer of resources
for our orchestration?
Also, good point on recovery. I'll spend some time
Hi,
I think if you don't do any operations that are sensitive to event-time
then just using a UNION/UNION ALL should work because then there won't
be any buffering by event time which could delay your output.
Have you tried this and have you seen an actual delay in your output?
Best,
Hi,
I'm not sure that what you want is possible. You say you want more
windows when there are more events for a given time frame? That is when
the events are more dense in time?
Also, using the event timestamp as the gap doesn't look correct. The gap
basically specifies the timeout for a
Hi Arvid,
Thank you so much for your detailed reply. I think I will go with one schema
per topic using GenericRecordAvroTypeInfo for genericRecords and not do any
custom magic.
Approach of sending records as byte array also seems quite interesting.
Right now I am deserializing avro records so
I see what was my mistake: I was using a field in my ORDER BY, while it
only support proctime() for now.
That allows me to create an append only stream, thanks a lot!
However, it still does not allow me to do what I need:
*If I use both my primary key and changing column in PARTITION BY, then
You need to differentiate two serialization abstractions (which I guess you
already know). One is coming from reading the source, where the
DeserializationSchema is used, and it translates the bytes of Kafka into
something that Flink can handle.
The second serialization occurs within Flink
So in this case, flink will fall back to default kyro serialiser right ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Tim,
afaik we are confusing two things here, there is a transaction timeout =
how long the transaction lasts until aborted. And what you see here is some
timeout while creating the transaction in the first place.
A quick google search turned up [1], from which I'd infer that you need to
set
Hi All,
I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I
have understood that the gap is computed dynamically by a function on each
element. What I should be able to obtain is a Flink application that can
automatically manage the windows based on the frequency of
To further add to this problem, I've now got our ops team to set
transaction.max.timeout.ms on our Kafka brokers to 1 hour (as suggested by
the Flink docs). However the problem persists and I'm still getting
the same error message.
I've confirmed that this config setting is actually set on the
Data loss exception using hash join in batch mode
Also realized I had a typo in the config dump I did in the previous email
(the one from the 10th). If I don't do
Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "90");
Then the value reported from the ProducerConfig is 360 and not 6 as
I
已经建了支持 first_value 和 last_value 的 merge 方法 issue[1]。
同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。
@李世钰 感兴趣的话可以认领下哈。
[1] https://issues.apache.org/jira/browse/FLINK-20110
[2] https://issues.apache.org/jira/browse/FLINK-20111
At 2020-11-12 19:51:29, "Jark Wu" wrote:
执行checkpoint失败,报下面的错。
2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at
用 session windown 确实能满足功能:
robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);
按照这种写法, 我理解 window state 中只保存了最近的一条记录。
正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。
On Thu, Nov 12, 2020 at 5:25 PM hailongwang
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink
1.11.0).
It works great but I have the problem that logs inside the onJobExecuted
are not logged anywhere..is it normal?
Best,
Flavio
If you follow the best practices, then topics should never have different
schemas as you can't enforce schema compatibility. You also have very
limited processing capabilities and clumsy workflows attached to it.
If you want to encode different kinds of events, then the common approach
is to use
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码
Luna Wong 于2020年11月11日周三 下午9:16写道:
> 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
> 我还想继承Elasticsearch6ApiCallBridge类。在new
> RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。
>
> 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote:
> Hi me,
>
>
> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
> 不支持 merge。
>
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want
Sure, I've attached it to this email. The process seems to restart once the
TimeoutException happens so it's repeated a couple of times.
Thanks for looking at it!
/Tim
On Wed, 11 Nov 2020 at 10:37, Aljoscha Krettek wrote:
> Hmm, could you please post the full stack trace that leads to the
>
Hi me,
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用
UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。
Best,
hailong
在 2020-11-12 17:07:58,"李世钰" 写道:
>1. FLINK版本 flink1.11
>
>
>
>
>2. 使用的是useBlinkPlanner
>
>
>
>
>3.执行sql
>
>SELECT
Hi,
I'm on 1.11.0, with a streaming job running on a YARN session, reading from
Kinesis.
I tried to stop the job using REST, with "drain=false". After that POST
request, I got back a request_id (not sure how should I use that for).
Checked the job in GUI, I could see that a savepoint has been
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ 文章理解一下
hl9...@126.com 于2020年11月12日周四 下午4:47写道:
> 是flink standalone 集群。
> job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。
>
>
>
> hl9...@126.com
>
> 发件人: Xintong Song
> 发送时间: 2020-11-12 13:18
> 收件人:
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
hdxg1101300123 于2020年11月12日周四 下午8:07写道:
> 可以设置检查点失败任务也失败
>
>
>
> 发自vivo智能手机
> > hi everyone,
> >
> > 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
> >
> >
hi
可以看看 Timer 的机制,能不能解决你的问题
Best zhisheng
hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道:
>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan" 写道:
> >感觉你这个应该是一个 session
可以设置检查点失败任务也失败
发自vivo智能手机
> hi everyone,
>
> 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
>
> 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
>
> best,
> amenhub
Hi,
I want to create an abstraction over N source tables (streams), and unify
them into 1 table. I know UNION and UNION ALL exist, but I'm looking for
DataStream.connect like semantics in regards to watermark control. I don't
want to take the minimum watermark over all N streams, as I know for
I see. now it has different query plans. It was documented on another
page so I got confused. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
On Thu, Nov 12, 2020 at 12:41 PM Jark Wu wrote:
>
> Hi Felipe,
>
> The default value of
Hi Niklas,
Python DataStream API will also be supported in coming release of 1.12.0 [1].
However, the functionalities are still limited for the time being compared to
the Java DataStream API, e.g. it will only support the stateless operations,
such as map, flat_map, etc.
[1]
Hi Laurent,
1. Currently, it's impossible to convert deduplicate with last row into an
append-only stream.
2. Yes, I think Ververica platform doesn't support 'changelog-json' format
natively.
However, regarding your case, I think you can use keep first row on
client_number+address key.
SELECT *
Hi Dian,
thank you very much for this valuable response. I already read about the UDAF,
but I wasn't aware of the fact that it is possible to return and UNNEST an
array. I will definitely have a try and hopefully this will solve my issue.
Another question that came up to my mind is whether
Hi Felipe,
The default value of `table.optimizer.agg-phase-strategy` is AUTO, if
mini-batch is enabled,
if will use TWO-PHASE, otherwise ONE-PHASE.
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy
On Thu, 12 Nov 2020 at 17:52,
Hi Suchithra,
I'm not sure you can actually pass passwords in any other way. I'm also not
sure this is needed if these are job-/cluster-specific because then, an
attacker would have to have access to that first in order to get these
credentials. And if the attacker has access to the
Hi Theo,
We had a very similar problem with one of our spark streaming jobs. Best
solution was to create a custom source having all external records in
cache, periodically reading external data and comparing it to cache. All
changed records were then broadcasted to task managers. We tried to
hi,
我现在的版本是flink-1.11.1没有加-d参数,也遇见了同样的问题,不知道是什么情况呢?
best,
amenhub
发件人: Yang Wang
发送时间: 2020-08-05 10:28
收件人: user-zh
主题: Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
你的Flink任务应该是用attach的方式起的,也就是没有加-d,这种情况在1.10之前起的任务本质上是一个session,
The common solution is to use a schema registry, like Confluent schema
registry [1]. All records have a small 5 byte prefix that identifies the
schema and that gets fetched by deserializer [2]. Here are some resources
on how to properly secure communication if needed [3].
[1]
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251
On 11.11.20 19:09, Aljoscha Krettek wrote:
Hi,
nice work on debugging this!
We need the synchronized block in the source because the call to
reader.advance() (via the invoker) and reader.getCurrent() (via
1. FLINK版本 flink1.11
2. 使用的是useBlinkPlanner
3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
SECOND(2), INTERVAL '2' MINUTE(1)) AS
1. FLINK版本 flink1.11
2. 使用的是useBlinkPlanner
3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS
kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
SECOND(2), INTERVAL '2' MINUTE(1)) AS
Hi Jack,
I don't get the difference from the "MiniBatch Aggregation" if
compared with the "Local-Global Aggregation". On the web page [1] it
says that I have to enable the TWO_PHASE parameter. So I compared the
query plan from both, with and without the TWO_PHASE parameter. And
they are the same.
Hi Jark,
thanks again for your quick response!
I tried multiple variants of my query by:
- specifying only the primary key in the PARTITION BY clause
- changing the order to DESC to keep the last row
--> I unfortunately always get the same error message.
If I try to make a simple select on the
尽早的可查,直接把delay设为0即可 (其它默认值)
On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:
> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> >
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致
这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
在 2020-11-12 14:34:59,"Danny Chan" 写道:
>感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
>
>Lei Wang 于2020年11月11日周三 下午2:03写道:
>
>> 有很多边缘机器人设备(我们称为 robot)往
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min
> 2020年11月12日 下午3:22,Jingsong Li 写道:
>
> Hi admin,
>
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
>
> On Thu,
Hi,
Thanks a lot for the reply. And you both are right. Serializing
GenericRecord without specifying schema was indeed a HUGE bottleneck in my
app. I got to know it through jfr analysis and then read the blog post you
mentioned. Now I am able to pump in lot more data per second. (In my test
setup
Hi,
from my experience serialization contributes a lot to the maximum
achievable throughput. I can strongly recommend checking out this blog
post, which has a lot of details on the topic:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
On Tue, Nov 10, 2020 at 9:46
是flink standalone 集群。
job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。
hl9...@126.com
发件人: Xintong Song
发送时间: 2020-11-12 13:18
收件人: user-zh
主题: Re: slot数量与并行度的大小关系
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
Hey Tim,
what Is your Flink job doing? Is it restarting from time to time?
Is the JobManager crashing, or the TaskManager?
On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl
wrote:
> Hi Tim,
> I'm not aware of any memory-related issues being related to the deployment
> mode used. Have you checked
Hey Josh,
As far as I understand the code CompletedCheckpoint.discard(), Flink is
removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
deleting the directory.
Which files are left over in your case?
Do you see any exceptions on the TaskManagers?
Best,
Robert
On Wed, Nov
Hi Jiahui,
using the yarn.container-start-command-template is indeed a good idea.
I was also wondering whether the Flink YARN client that submits the Flink
cluster to YARN has knowledge of the host where the ApplicationMaster gets
deployed to. But that doesn't seem to be the case.
On Wed, Nov
86 matches
Mail list logo