Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-08 Thread Jiayi Liao
Hi Yun,

Thanks for your detailed description about the progress of Flink and
RocksDB's community. There're more than 1,200 jobs using RocksDB as the
state backend at Bytedance, and we do met several problems mentioned in the
JIRA issues you referred:

(1) Memory Management: for large-scale jobs(10TB+ state), it's hard to tune
the memory usage due to non-restrict memory control on RocksDB. And
currently we have to manually estimate the memory usage based on RocksDB's
wiki, which increases our maintainence's cost a lot.
(1) DeleteRange Support: we've made a few benchmarks on the performance of
rescaling and found out the time cost is up to a few minutes when a task's
state is larger than 10GB. I'm glad to see such improvements being merged
after upgrading RocksDB's version.
(3) ARM support: we've supported ARM's platform on our own last year by
some hacking on the codes, and it's great to see that RocksDB has an
official release on ARM platform.

I think the new features(or bugfix) are more important for us and I'm +1
for this.


Best,
Jiayi Liao

On Thu, Aug 5, 2021 at 1:50 AM Yun Tang  wrote:

> Hi Yuval,
>
> Upgrading RocksDB version is a long story since Flink-1.10.
> When we first plan to introduce write buffer manager to help control the
> memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from
> current RocksDB-5.17. However, we found performance regression in our micro
> benchmark on state operations [1] if bumped to RocksDB-5.18. We did not
> figure the root cause at that time and decide to cherry pick the commits of
> write buffer manager to our own FRocksDB [2]. And we finally released our
> own frocksdbjni-5.17.2-artisans-2.0 at that time.
>
> As time goes no, more and more bugs or missed features have been reported
> in the old RocksDB version. Such as:
>
>1. Cannot support ARM platform [3]
>2. Dose not have stable deleteRange API, which is useful for Flink
>scale out [4]
>3. Cannot support strict block cache [5]
>4. Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
>5. Uncontrolled log size make us disabled the RocksDB internal LOG [7]
>6. RocksDB's optimizeForPointLookup option might cause data lost [8]
>7. Current dummy entry used for memory control in RocksDB-5.17 is too
>large, leading performance problem [9]
>8. Cannot support alpine-based images.
>9. ...
>
> Some of the bugs are walked around, and some are still open.
>
> And we decide to make some changes from Flink-1.12. First of all, we
> reported the performance regression problem compared with RocksDB-5.18 and
> RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions
> are a bit older for the community, and RocksJava usage might not be the
> core part for facebook guys, we did not get useful replies. Thus, we decide
> to figure out the root cause of performance regression by ourself.
> Fortunately, we find the cause via binary search the commits among
> RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To
> be short, the performance regression is due to different implementation of
> `__thread` and `thread_local` in gcc and would have more impact on dynamic
> loading [11], which is also what current RocksJava jar package does. With
> my patch [12], the performance regression would disappear if comparing
> RocksDB-5.18 with RocksDB-5.17.
>
> Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to
> RocksDB-6.x. However, another performance regression appeared even with my
> patch [12]. With previous knowledge, we know that we must verify the built
> .so files with our java-based benchmark instead of using RocksDB built-in
> db-bench. I started to search the 1340+ commits from RocksDB-5.18 to
> RocksDB-6.11 to find the performance problem. However, I did not figure out
> the root cause after spending several weeks this time. The performance
> behaves up and down in those commits and I cannot get *the commit *which
> lead the performance regression. Take this commit of integrating block
> cache tracer in block-based table reader [13] for example, I noticed that
> this commit would cause a bit performance regression and that might be the
> useless usage accounting in operations, however, the problematic code was
> changed in later commits. Thus, after several weeks digging, I have to give
> up for the endless searching in the thousand commits temporarily. As
> RocksDB community seems not make the project management system public,
> unlike Apache's open JIRA systems, we do not know what benchmark they
> actually run before releasing each version to guarantee the performance.
>
> With my patch [10] on latest RocksDB-6.20.3, we could get the results on
> nexmark in the original thread sent by Stephan, and we can see the
> performance behaves closely in many real-world cases. And we also hope new
> features, such as direct buffer supporting [14] in RocksJava could help
> improve RocksDB's performance in 

Inspecting SST state of rocksdb

2021-08-08 Thread Kai Fu
Hi team,

I'm trying to inspect SST files of flink's state with sst related tools
like sst_dump, ldb in wiki
.
But it seems I'm getting meaningless results as shown below. The tools I'm
using are from RocksDB's trunk and built from source. Am I doing it the
right way, or is there any alternative to inspect the state? We're aware of
Flink's queryable state, while it seems not well supported for SQL
generated operators.
















*$ ./sst_dump --file=../db/30.sst --command=scan
--read_num=50options.env is 0xba33e0Process ../db/30.sstSst file
format: block-basedfrom [] to []'� =>'� =>'� =>'� =>'� =>'� =>'� =>'� =>'�
=>'� =>'� =>*

-- 
*Best wishes,*
*- Kai*


Re: 批流一体的一些疑问

2021-08-08 Thread Caizhi Weng
Hi!

如果观察到 sink 节点有数据写入,但 mysql
结果表的数据没有变化,可以看一下是否配置了 sink.buffer-flush.interval,如果这个值太大会导致在 flush interval
时间内数据没有 flush 到 mysql 里。它的默认值是 1s。

如果不是这个问题,可能需要提供比较详细的 SQL 来让大家找一下问题所在。

yanyunpeng  于2021年8月9日周一 上午10:24写道:

> HI:
> 大概的逻辑是这样的
> insert mysql_result_table【mysql结果表】
> select  * from  id_all【id的所有合集  mysql表】 where id not in (select distinct
> id from flink_view【2小时的id的合集】)
>
>
> insert到mysql的时候 结果数据不会随着窗口变动而变化
>
>
> 在 2021年8月9日 10:19,Caizhi Weng 写道:
>
>
> Hi! 不太明白这里的“结果插入数据库的时候变成了批,mysql 中的结果不会变化”是什么含义。这是说 sink
> 表和维表是同一张表吗?希望能更清晰地描述场景和做法。 yanyunpeng 
> 于2021年8月9日周一 上午10:12写道: > 发现一个问题大佬能帮忙解答一二? > 1. 数据中的补充表(补充流信息, 流信息的设备配置全集)
> > 2.数据流 kafka原表 > 主要想实现的目标 发现一段时间内未发送消息的设备 > 主要实现流程 > 1.
> 2小时的滑动窗口来distinct所有的设备ID > 2. 查询mysql的设备合集表 查询 ID not in (distinct id from
> 滑动窗口) > > 直接查询的时候是没有问题的 能达到批和流一起使用 > 但是结果插入数据库的时候变成了批 mysql中的结果不会变化 > >
> 请问这种情况是什么机制 如果是批流一体情况下 回当做批处理那为啥select的时候能实现目标?


Re: KafkaDeserializationSchema.open() is not called after task state change

2021-08-08 Thread Caizhi Weng
Hi!

This does not sound like an expected behavior. However there might be a lot
of reasons causing some values to be uninitialized (for example, once I've
met a bug that a thread is created and runs in the open method before some
values are initialized). You can always add some log at the beginning of
the open method and see if it is really not called. If it is the case then
it should be a bug and the developers should look into it.

Gil Amsalem  于2021年8月8日周日 下午7:54写道:

> Hi,
>
> I have a class that implements KafkaDeserializationSchema and overrides
> the open() method.
> When deploying my job, everything seems to work as expected.
> But, when my task hit an exception and switch to CANCELED -> CREATED ->
> DEPLOYING, it seems that the open method is not called, and I am ending up
> with uninitialized values.
>
> Is that expected? Is it a bug?
>
> --
> Gil Amsalem
>
>


Re: 批流一体的一些疑问

2021-08-08 Thread yanyunpeng
HI:
大概的逻辑是这样的
insert mysql_result_table【mysql结果表】
select  * from  id_all【id的所有合集  mysql表】 where id not in (select distinct id 
from flink_view【2小时的id的合集】)


insert到mysql的时候 结果数据不会随着窗口变动而变化


在 2021年8月9日 10:19,Caizhi Weng 写道:


Hi! 不太明白这里的“结果插入数据库的时候变成了批,mysql 中的结果不会变化”是什么含义。这是说 sink 
表和维表是同一张表吗?希望能更清晰地描述场景和做法。 yanyunpeng  于2021年8月9日周一 
上午10:12写道: > 发现一个问题大佬能帮忙解答一二? > 1. 数据中的补充表(补充流信息, 流信息的设备配置全集) > 2.数据流 kafka原表 > 
主要想实现的目标 发现一段时间内未发送消息的设备 > 主要实现流程 > 1. 2小时的滑动窗口来distinct所有的设备ID > 2. 
查询mysql的设备合集表 查询 ID not in (distinct id from 滑动窗口) > > 直接查询的时候是没有问题的 能达到批和流一起使用 
> 但是结果插入数据库的时候变成了批 mysql中的结果不会变化 > > 请问这种情况是什么机制 如果是批流一体情况下 
回当做批处理那为啥select的时候能实现目标?

Re: 批流一体的一些疑问

2021-08-08 Thread Caizhi Weng
Hi!

不太明白这里的“结果插入数据库的时候变成了批,mysql 中的结果不会变化”是什么含义。这是说 sink
表和维表是同一张表吗?希望能更清晰地描述场景和做法。

yanyunpeng  于2021年8月9日周一 上午10:12写道:

> 发现一个问题大佬能帮忙解答一二?
> 1. 数据中的补充表(补充流信息, 流信息的设备配置全集)
> 2.数据流 kafka原表
> 主要想实现的目标 发现一段时间内未发送消息的设备
> 主要实现流程
> 1. 2小时的滑动窗口来distinct所有的设备ID
> 2. 查询mysql的设备合集表 查询 ID not in (distinct id from 滑动窗口)
>
> 直接查询的时候是没有问题的 能达到批和流一起使用
> 但是结果插入数据库的时候变成了批 mysql中的结果不会变化
>
> 请问这种情况是什么机制 如果是批流一体情况下 回当做批处理那为啥select的时候能实现目标?


批流一体的一些疑问

2021-08-08 Thread yanyunpeng
发现一个问题大佬能帮忙解答一二?
1. 数据中的补充表(补充流信息, 流信息的设备配置全集)
2.数据流 kafka原表
主要想实现的目标 发现一段时间内未发送消息的设备
主要实现流程
1. 2小时的滑动窗口来distinct所有的设备ID
2. 查询mysql的设备合集表 查询 ID not in (distinct id from 滑动窗口)

直接查询的时候是没有问题的 能达到批和流一起使用 
但是结果插入数据库的时候变成了批 mysql中的结果不会变化 

请问这种情况是什么机制 如果是批流一体情况下 回当做批处理那为啥select的时候能实现目标?

回复:如何监控kafka延迟

2021-08-08 Thread Jimmy Zhang
您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink 
job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢!




|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2021年07月28日 17:58,jie mei 写道:
hi,all

我们是通过 grafana 对采集到的 flink kafka 的
metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。

xuhaiLong  于2021年7月28日周三 下午5:46写道:

> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>
>
> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
> Hi comsir
>
> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>
> 维护方式:
>
> 1. 每个服务的topic的offset 减去 groupid的offset
>
> 2. 尽量可以计算出各种消费速度
>
> 3. rocketmq控制台,可看到消费进度,可以参照下。
>
>
> 在 2021/7/28 上午11:02, 龙逸尘 写道:
> Hi comsir,
> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
> group  id 需要自己维护。
>
> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>
> hi all
> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
> 小问题:
> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
> 4.有比较优雅的实现方式吗?
> 非常感谢 期待解答 感谢感谢
>


--

*Best Regards*
*Jeremy Mei*


Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-08 Thread Rion Williams
Hi folks,

I have a use-case that I wanted to initially pose to the mailing list as I’m 
not terribly familiar with the Elasticsearch connector to ensure I’m not going 
down the wrong path trying to accomplish this in Flink (or if something 
downstream might be a better option).

Basically, I have the following pieces to the puzzle:
A stream of tenant-specific events
An HTTP endpoint containing mappings for tenant-specific Elastic cluster 
information (as each tenant has its own specific Elastic cluster/index)
What I’m hoping to accomplish is the following:
One stream will periodically poll the HTTP endpoint and store these cluster 
mappings in state (keyed by tenant with cluster info as the value)
The event stream will be keyed by tenant and connected to the cluster mappings 
stream.
I’ll need to an Elasticsearch sink that can route the tenant-specific event 
data to its corresponding cluster/index from the mapping source.
I know that the existing Elasticsearch sink supports dynamic indices, however I 
didn’t know if it’s possible to adjust the cluster like I would need on a 
per-tenant basis or if there’s a better approach here? 

Any advice would be appreciated.

Thanks,

Rion

KafkaDeserializationSchema.open() is not called after task state change

2021-08-08 Thread Gil Amsalem
Hi,

I have a class that implements KafkaDeserializationSchema and overrides the
open() method.
When deploying my job, everything seems to work as expected.
But, when my task hit an exception and switch to CANCELED -> CREATED ->
DEPLOYING, it seems that the open method is not called, and I am ending up
with uninitialized values.

Is that expected? Is it a bug?

-- 
Gil Amsalem