Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 文章 Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s
,就会被丢弃。

楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl
就可以满足需求了。

BTW: watermark 我觉得很难使用好,实际使用场景非常有限。



Zhiwen Sun



On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang  wrote:

> > 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确
>
> 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
> 的数据频繁访问情况下,那么这个数据就不会过期。
>
> > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。
>
> 我记得日志是会打印相关的日志。能提一些相关的日志吗?
>
> best,
> Shengkai
>
> lxk  于2022年6月14日周二 20:04写道:
>
> > Hi,
> >   我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute
> > 目前来看数据量和使用inner join要差不多了。以下是代码
> > Table headerTable =
> > streamTableEnvironment.fromDataStream(headerFilterStream,
> >  Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream,
> > Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> >
> >
> > streamTableEnvironment.createTemporaryView("header",headerTable);
> > streamTableEnvironment.createTemporaryView("item",itemTable);
> > Table result = streamTableEnvironment.sqlQuery("select
> header.customer_id"
> > +
> > ",item.goods_id" +
> > ",header.id" +
> > ",header.order_status" +
> > ",header.shop_id" +
> > ",header.parent_order_id" +
> > ",header.order_at" +
> > ",header.pay_at" +
> > ",header.channel_id" +
> > ",header.root_order_id" +
> > ",item.id" +
> > ",item.row_num" +
> > ",item.p_sp_sub_amt" +
> > ",item.display_qty" +
> > ",item.qty" +
> > ",item.bom_type" +
> > " from item JOIN header on header.id = item.order_id and item.rowtime
> > BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime +
> INTERVAL
> > '4' MINUTE");
> >
> >   对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval
> > join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
> >   我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval
> > join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
> > Configuration conf = new Configuration();
> > conf.setString("table.exec.mini-batch.enabled","true");
> > conf.setString("table.exec.mini-batch.allow-latency","15 s");
> > conf.setString("table.exec.mini-batch.size","100");
> > conf.setString("table.exec.state.ttl","20 s");
> > env.configure(conf);
> > StreamTableEnvironment streamTableEnvironment =
> > StreamTableEnvironment.create(env,
> > EnvironmentSettings.fromConfiguration(conf));
> >
> >
> > 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2022-06-13 21:12:48,"Xuyang"  写道:
> > >Hi,
> > >  1、理论上来说inner join关联的数据量应该比interval
> > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
> > >  2、inner
> >
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
> >
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> > >
> > >
> > >如果我有不对的地方,请指正我哈。
> > >
> > >
> > >
> > >
> > >--
> > >
> > >Best!
> > >Xuyang
> > >
> > >
> > >
> > >
> > >
> > >在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
> > >>非常感谢回复
> > >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据
> > >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner
> >
> join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> > join应该也会受这样的影响
> > >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner
> > join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
> > >>
> > >>
> > >>
> > >>lxk7...@163.com
> > >>
> > >>发件人: Shengkai Fang
> > >>发送时间: 2022-06-11 20:35
> > >>收件人: user-zh
> > >>主题: Re: Re: Flink 使用interval join数据丢失疑问
> > >>hi,
> > >>
> > >>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
> > >>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是
> > `event
> > >>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
> > >>
> > >>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的
> event
> > >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
> > >>11:00之前的数据都可以被清理了。
> > >>
> > >>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
> > >>
> > >>best,
> > >>Shengkai
> > >>
> > >>lxk7...@163.com  于2022年6月10日周五 23:03写道:
> > >>
> > >>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
> > >>>
> > >>>
> >
> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
> > >>>
> > 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
> > >>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
> > >>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
> > >>> 3.使用流转表,sql interval 

Re: flink-connector-jdbc是否支持多个values问题

2022-06-14 文章 Zhiwen Sun
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true*

生成的 SQL 类似:

INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`,
> `order_name`, `total`)
>  VALUES
>   (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) ,
>   (2, '2022-06-14 22:31:21.496', 'OK', 'order-name-1', 131)
> ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`),
> `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`),
> `order_name`=VALUES(`order_name`), `total`=VALUES(`total`)



Zhiwen Sun



On Mon, Mar 7, 2022 at 5:07 PM 黑色  wrote:

> 你看一下底层的源码实现全知道了,它insert into x() values() ON duplicate
> Key实现Insert update,所以不会的
>
>
>
>
> --原始邮件--
> 发件人: "payne_z" 发送时间: 2022年3月7日(星期一) 下午3:49
> 收件人: "user-zh" 主题: flink-connector-jdbc是否支持多个values问题
>
>
>
> 请问flink-connector-jdbc是否支持同时写入多个values的用法?


Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 文章 Shengkai Fang
> 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确

不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
的数据频繁访问情况下,那么这个数据就不会过期。

> 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。

我记得日志是会打印相关的日志。能提一些相关的日志吗?

best,
Shengkai

lxk  于2022年6月14日周二 20:04写道:

> Hi,
>   我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute
> 目前来看数据量和使用inner join要差不多了。以下是代码
> Table headerTable =
> streamTableEnvironment.fromDataStream(headerFilterStream,
>  Schema.newBuilder()
> .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> .build());
> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream,
> Schema.newBuilder()
> .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> .build());
>
>
> streamTableEnvironment.createTemporaryView("header",headerTable);
> streamTableEnvironment.createTemporaryView("item",itemTable);
> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
> +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from item JOIN header on header.id = item.order_id and item.rowtime
> BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + INTERVAL
> '4' MINUTE");
>
>   对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval
> join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
>   我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval
> join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15 s");
> conf.setString("table.exec.mini-batch.size","100");
> conf.setString("table.exec.state.ttl","20 s");
> env.configure(conf);
> StreamTableEnvironment streamTableEnvironment =
> StreamTableEnvironment.create(env,
> EnvironmentSettings.fromConfiguration(conf));
>
>
> 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-06-13 21:12:48,"Xuyang"  写道:
> >Hi,
> >  1、理论上来说inner join关联的数据量应该比interval
> join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
> >  2、inner
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> >
> >
> >如果我有不对的地方,请指正我哈。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >
> >在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
> >>非常感谢回复
> >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据
> >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner
> join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> join应该也会受这样的影响
> >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner
> join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
> >>
> >>
> >>
> >>lxk7...@163.com
> >>
> >>发件人: Shengkai Fang
> >>发送时间: 2022-06-11 20:35
> >>收件人: user-zh
> >>主题: Re: Re: Flink 使用interval join数据丢失疑问
> >>hi,
> >>
> >>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
> >>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是
> `event
> >>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
> >>
> >>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
> >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
> >>11:00之前的数据都可以被清理了。
> >>
> >>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
> >>
> >>best,
> >>Shengkai
> >>
> >>lxk7...@163.com  于2022年6月10日周五 23:03写道:
> >>
> >>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
> >>>
> >>>
> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
> >>>
> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
> >>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
> >>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
> >>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
> >>>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
> >>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
> >>>
> >>>
> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
> >>> join其实是一个window join吗?
> >>>
> >>>
> >>>
> >>> lxk7...@163.com
> >>>
> >>> 发件人: lxk
> >>> 发送时间: 2022-06-10 18:18
> >>> 收件人: user-zh
> >>> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
> >>>
> >>>
> >>>
> >>> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
> >>>
> >>>
> >>>
> >>>
> >>> Table headerTable =
> >>> 

关于PyFlink的开发环境问题

2022-06-14 文章 张 兴博
您好:
   我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:

Traceback (most recent call last):
  File "/root/.py", line 6, in 
s_env = StreamExecutionEnvironment.get_execution_environment()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 805, in get_execution_environment
return StreamExecutionEnvironment(j_stream_exection_environment)
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 62, in __init__
self._open()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 973, in _open
startup_loopback_server()
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
 line 963, in startup_loopback_server
from pyflink.fn_execution.beam.beam_worker_pool_service import \
  File 
"/usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
 line 31, in 
from apache_beam.options.pipeline_options import DebugOptions
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/__init__.py", line 
96, in 
from apache_beam import io
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/__init__.py", 
line 23, in 
from apache_beam.io.avroio import *
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/avroio.py", line 
63, in 
from apache_beam.io import filebasedsink
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/io/filebasedsink.py", line 
36, in 
from apache_beam.io import iobase
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/iobase.py", line 
57, in 
from apache_beam.transforms import Impulse
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/__init__.py", 
line 25, in 
from apache_beam.transforms.external import *
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/external.py", 
line 45, in 
from apache_beam.runners import pipeline_context
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/runners/pipeline_context.py",
 line 51, in 
from apache_beam.transforms import environments
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/environments.py",
 line 54, in 
from apache_beam.runners.portability.sdk_container_builder import 
SdkContainerImageBuilder
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/sdk_container_builder.py",
 line 44, in 
from apache_beam.internal.gcp.auth import get_service_credentials
  File 
"/usr/local/lib/python3.8/dist-packages/apache_beam/internal/gcp/auth.py", line 
28, in 
from oauth2client.client import GoogleCredentials
  File "/usr/local/lib/python3.8/dist-packages/oauth2client/client.py", line 
39, in 
from oauth2client import transport
  File "/usr/local/lib/python3.8/dist-packages/oauth2client/transport.py", line 
17, in 
import httplib2
ModuleNotFoundError: No module named 'httplib2'

通过查询发现在python新版中,httplib2已经不用了?采用的名字是http.client?
我的python版本为3.8.10,jdk为openjdk 11.0.15(另一台为java 1.8)
我想知道这是什么原因造成的呢?怎么能解决这个问题呢?

感谢您在百忙之中解答我的问题,万分感谢~!

发送自 Windows 11 版邮件应用



Re: 怀疑源码中的一个方法是never reached code

2022-06-14 文章 Jing Ge
Hi,

友情提醒:开ticket以及以后在dev里讨论,记得用英语哈。

祝好
Jing


On Tue, Jun 14, 2022 at 3:23 PM Yun Tang  wrote:

> Hi,育锋
>
> 我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。
>
> 祝好
> 唐云
> 
> From: 朱育锋 
> Sent: Tuesday, June 14, 2022 19:33
> To: user-zh@flink.apache.org 
> Subject: 怀疑源码中的一个方法是never reached code
>
> Hello Everyone
>
>
> 在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:
>
> 1.
> 在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
> 2.
> false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
>
> 只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行
>
>
> 参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
>
> 我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面
>
> 也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
> >
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
> >
> [3]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
> >
> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
> >
> [5]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
> >
>
> Best regards
> YuFeng
>


Re: 怀疑源码中的一个方法是never reached code

2022-06-14 文章 Yun Tang
Hi,育锋

我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。

祝好
唐云

From: 朱育锋 
Sent: Tuesday, June 14, 2022 19:33
To: user-zh@flink.apache.org 
Subject: 怀疑源码中的一个方法是never reached code

Hello Everyone

在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:

1. 
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2. 
false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行

参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面

也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
 

[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
 

[3] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
 

[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
 

[5] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
 


Best regards
YuFeng


Re:Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 文章 lxk
Hi,
  我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute 目前来看数据量和使用inner 
join要差不多了。以下是代码
Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream,   
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS 
TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
.build());
Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, 
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS 
TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
.build());


streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);
Table result = streamTableEnvironment.sqlQuery("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from item JOIN header on header.id = item.order_id and item.rowtime BETWEEN 
header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + INTERVAL '4' MINUTE");

  对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval 
join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
  我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval 
join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15 s");
conf.setString("table.exec.mini-batch.size","100");
conf.setString("table.exec.state.ttl","20 s");
env.configure(conf);
StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));


我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?





















在 2022-06-13 21:12:48,"Xuyang"  写道:
>Hi,
>  1、理论上来说inner join关联的数据量应该比interval 
> join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
>  2、inner 
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
>  
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
>
>
>如果我有不对的地方,请指正我哈。
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
>>非常感谢回复
>>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 
>>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner 
>>join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
>> join应该也会受这样的影响
>>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner 
>>join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
>>
>>
>>
>>lxk7...@163.com
>> 
>>发件人: Shengkai Fang
>>发送时间: 2022-06-11 20:35
>>收件人: user-zh
>>主题: Re: Re: Flink 使用interval join数据丢失疑问
>>hi,
>> 
>>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
>>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event
>>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
>> 
>>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
>>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
>>11:00之前的数据都可以被清理了。
>> 
>>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
>> 
>>best,
>>Shengkai
>> 
>>lxk7...@163.com  于2022年6月10日周五 23:03写道:
>> 
>>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
>>>
>>> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
>>> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
>>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
>>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
>>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
>>>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
>>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
>>>
>>> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
>>> join其实是一个window join吗?
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> 发件人: lxk
>>> 发送时间: 2022-06-10 18:18
>>> 收件人: user-zh
>>> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
>>>
>>>
>>>
>>> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
>>>
>>>
>>>
>>>
>>> Table headerTable =
>>> streamTableEnvironment.fromDataStream(headerFilterStream,
>>>  Schema.newBuilder()
>>> .columnByExpression("rowtime",
>>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>>> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
>>> .build());
>>> Table itemTable =
>>> streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
>>> .columnByExpression("rowtime",
>>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>>> .watermark("rowtime", 

怀疑源码中的一个方法是never reached code

2022-06-14 文章 朱育锋
Hello Everyone

在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:

1. 
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2. 
false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行

参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面

也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
 

[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
 

[3] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
 

[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
 

[5] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
 


Best regards
YuFeng