Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s ,就会被丢弃。 楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl 就可以满足需求了。 BTW: watermark 我觉得很难使用好,实际使用场景非常有限。 Zhiwen Sun On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote: > > 我在sql inner

Re: flink-connector-jdbc是否支持多个values问题

2022-06-14 Thread Zhiwen Sun
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true* 生成的 SQL 类似: INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`, > `order_name`, `total`) > VALUES > (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) , > (2, '2022-06-14 22:31:21.496', 'OK',

Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread Shengkai Fang
> 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key 的数据频繁访问情况下,那么这个数据就不会过期。 > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。 我记得日志是会打印相关的日志。能提一些相关的日志吗? best, Shengkai lxk 于2022年6月14日周二 20:04写道: > Hi, > 我目前使用sql interval

Re: Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Hangxiang Yu
Hi, Jai. Could you share your configuration about the checkpoint (interval, min-pause, and so on) and the checkpoint details in the Flink UI ? I guess the delay of the checkpoint may be related to the last checkpoint completion time as you could see in the

关于PyFlink的开发环境问题

2022-06-14 Thread 张 兴博
您好: 我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为: Traceback (most recent call last): File "/root/.py", line 6, in s_env = StreamExecutionEnvironment.get_execution_environment() File

Spike in checkpoint start delay every 15 minutes

2022-06-14 Thread Jai Patel
We've noticed a spike in the start delays in our incremental checkpoints every 15 minutes. The Flink job seems to start out smooth, with checkpoints in in the 15s range and negligible start delays. Then every 3rd or 4th checkpoint has a long start delay (~2-3 minutes). Teh checkpoints in

Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread Jing Ge
Hi Bastien, Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within Flink in the master branch. Could you please point out the code that committed offset is used as default? W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() is used, an exception will be

Re: Apache Flink - Reading data from Scylla DB

2022-06-14 Thread Jing Ge
Hi, Please be aware that SourceFunction will be deprecated soon[1]. It is recommended to build a new source connector based on the new Source API design by FLIP-27[2]. You might take the Kafka connector as the reference implementation. Best regards, Jing [1]

Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Mike Barborak
Thank you for your replies. Upgrading is in our plans but I think Yun is saying that might not help. We are still trying to find what part of the savepoint is causing the error. We will try removing pieces of the job graph until we are able to savepoint. From: Yun Tang Date: Tuesday, June 14,

Re: Kafka Consumer commit error

2022-06-14 Thread Martijn Visser
Hi Christian, There's another similar error reported by someone else. I've linked the tickets together and asked one of the Kafka maintainers to have a look at this. Best regards, Martijn Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz < christian.lor...@mapp.com>: > Hi Alexander, > > >

Re: Kafka Consumer commit error

2022-06-14 Thread Christian Lorenz
Hi Alexander, I’ve created a Jira ticket here https://issues.apache.org/jira/browse/FLINK-28060. Unfortunately this is causing some issues to us. I hope with the attached demo project the root cause of this can also be determined, as this is reproducible in Flink 1.15.0, but not in Flink

Re: 怀疑源码中的一个方法是never reached code

2022-06-14 Thread Jing Ge
Hi, 友情提醒:开ticket以及以后在dev里讨论,记得用英语哈。 祝好 Jing On Tue, Jun 14, 2022 at 3:23 PM Yun Tang wrote: > Hi,育锋 > > 我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。 > > 祝好 > 唐云 > > From: 朱育锋 > Sent: Tuesday, June 14, 2022 19:33 > To:

New KafkaSource API : Change in default behavior regarding starting offset

2022-06-14 Thread bastien dine
Hello everyone, Does someone know why the starting offset behaviour has changed in the new Kafka Source ? This is now from earliest (code in KafkaSourceBuilder), doc says : "If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default." from :

Re: Flink running same task on different Task Manager

2022-06-14 Thread Weihua Hu
Hi, IMO, Broadcast is a better way to do this, which can reduce the QPS of external access. If you do not want to use Broadcast, Try using RichFunction, start a thread in the open() method to refresh the data regularly. but be careful to clean up your data and threads in the close() method,

Re: 怀疑源码中的一个方法是never reached code

2022-06-14 Thread Yun Tang
Hi,育锋 我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。 祝好 唐云 From: 朱育锋 Sent: Tuesday, June 14, 2022 19:33 To: user-zh@flink.apache.org Subject: 怀疑源码中的一个方法是never reached code Hello Everyone

Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Yun Tang
Hi Mike, I think the root cause is that the size of java bytes array still exceed VM limit. The exception message is not friendly and not covered by sanity check [1] as it uses different code path [2]: The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the byte array

context.timestamp null in keyedprocess function

2022-06-14 Thread bat man
Hi, We are using flink 12.1 on AWS EMR. The job reads the event stream and enrich stream from another topic. We extend AssignerWithPeriodicWatermarks to assign watermarks and extract timestamp from the event and handle idle source partitions. AutoWatermarkInterval set to 5000L. The timestamp

Re:Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread lxk
Hi, 我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute 目前来看数据量和使用inner join要差不多了。以下是代码 Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")

Re: How to handle deletion of items using PyFlink SQL?

2022-06-14 Thread John Tipper
Yes, I’m interested in the best pattern to follow with SQL to allow for a downstream DB using the JDBC SQL connector to reflect the state of rows added and deleted upstream. So imagine there is a crawl event at t=C1 that happens with an associated timestamp and which finds resources A,B,C. Is

怀疑源码中的一个方法是never reached code

2022-06-14 Thread 朱育锋
Hello Everyone 在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行: 1. 在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2] 2.

Re: Flink operator deletes the FlinkDeplyoment after a while

2022-06-14 Thread Gyula Fóra
Hi Sigalit, This could be related to https://issues.apache.org/jira/browse/FLINK-27889 We have fixed this issue already (after the release), you could simply use the latest operator image from of `release-1.0: *ghcr.io/apache/flink-kubernetes-operator:cc8207c