我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s
,就会被丢弃。
楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl
就可以满足需求了。
BTW: watermark 我觉得很难使用好,实际使用场景非常有限。
Zhiwen Sun
On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote:
> > 我在sql inner
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true*
生成的 SQL 类似:
INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`,
> `order_name`, `total`)
> VALUES
> (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) ,
> (2, '2022-06-14 22:31:21.496', 'OK',
> 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确
不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
的数据频繁访问情况下,那么这个数据就不会过期。
> 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。
我记得日志是会打印相关的日志。能提一些相关的日志吗?
best,
Shengkai
lxk 于2022年6月14日周二 20:04写道:
> Hi,
> 我目前使用sql interval
Hi, Jai.
Could you share your configuration about the checkpoint (interval,
min-pause, and so on) and the checkpoint details in the Flink UI ?
I guess the delay of the checkpoint may be related to the last checkpoint
completion time as you could see in the
您好:
我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
Traceback (most recent call last):
File "/root/.py", line 6, in
s_env = StreamExecutionEnvironment.get_execution_environment()
File
We've noticed a spike in the start delays in our incremental checkpoints
every 15 minutes. The Flink job seems to start out smooth, with
checkpoints in in the 15s range and negligible start delays. Then every
3rd or 4th checkpoint has a long start delay (~2-3 minutes). Teh
checkpoints in
Hi Bastien,
Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
Flink in the master branch. Could you please point out the code that
committed offset is used as default?
W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be
Hi,
Please be aware that SourceFunction will be deprecated soon[1]. It is
recommended to build a new source connector based on the new Source API
design by FLIP-27[2]. You might take the Kafka connector as the
reference implementation.
Best regards,
Jing
[1]
Thank you for your replies.
Upgrading is in our plans but I think Yun is saying that might not help.
We are still trying to find what part of the savepoint is causing the error. We
will try removing pieces of the job graph until we are able to savepoint.
From: Yun Tang
Date: Tuesday, June 14,
Hi Christian,
There's another similar error reported by someone else. I've linked the
tickets together and asked one of the Kafka maintainers to have a look at
this.
Best regards,
Martijn
Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz <
christian.lor...@mapp.com>:
> Hi Alexander,
>
>
>
Hi Alexander,
I’ve created a Jira ticket here
https://issues.apache.org/jira/browse/FLINK-28060.
Unfortunately this is causing some issues to us.
I hope with the attached demo project the root cause of this can also be
determined, as this is reproducible in Flink 1.15.0, but not in Flink
Hi,
友情提醒:开ticket以及以后在dev里讨论,记得用英语哈。
祝好
Jing
On Tue, Jun 14, 2022 at 3:23 PM Yun Tang wrote:
> Hi,育锋
>
> 我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。
>
> 祝好
> 唐云
>
> From: 朱育锋
> Sent: Tuesday, June 14, 2022 19:33
> To:
Hello everyone,
Does someone know why the starting offset behaviour has changed in the new
Kafka Source ?
This is now from earliest (code in KafkaSourceBuilder), doc says :
"If offsets initializer is not specified, OffsetsInitializer.earliest() will
be used by default." from :
Hi,
IMO, Broadcast is a better way to do this, which can reduce the QPS of
external access.
If you do not want to use Broadcast, Try using RichFunction, start a thread
in the open() method to refresh the data regularly. but be careful to clean
up your data and threads in the close() method,
Hi,育锋
我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。
祝好
唐云
From: 朱育锋
Sent: Tuesday, June 14, 2022 19:33
To: user-zh@flink.apache.org
Subject: 怀疑源码中的一个方法是never reached code
Hello Everyone
Hi Mike,
I think the root cause is that the size of java bytes array still exceed VM
limit.
The exception message is not friendly and not covered by sanity check [1] as it
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the
byte array
Hi,
We are using flink 12.1 on AWS EMR. The job reads the event stream and
enrich stream from another topic.
We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
timestamp from the event and handle idle source partitions.
AutoWatermarkInterval set to 5000L.
The timestamp
Hi,
我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute 目前来看数据量和使用inner
join要差不多了。以下是代码
Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
TIMESTAMP_LTZ(3))")
Yes, I’m interested in the best pattern to follow with SQL to allow for a
downstream DB using the JDBC SQL connector to reflect the state of rows added
and deleted upstream.
So imagine there is a crawl event at t=C1 that happens with an associated
timestamp and which finds resources A,B,C. Is
Hello Everyone
在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:
1.
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2.
Hi Sigalit,
This could be related to https://issues.apache.org/jira/browse/FLINK-27889
We have fixed this issue already (after the release), you could simply use
the latest operator image from of `release-1.0:
*ghcr.io/apache/flink-kubernetes-operator:cc8207c
21 matches
Mail list logo