[
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708392#comment-16708392
]
Dmitry Buykin edited comment on KAFKA-7695 at 12/4/18 8:40 AM:
---------------------------------------------------------------
[~mjsax] I think it's a bug because KStreams derive settings from Consumer but
not support Consumer's contract with these settings. I think "try and fail"
approach is not the best option to find supported set of settings for KStreams.
About your question "How partition assignment could help with throttling?":
My idea was to use the same approach like in
[KIP-169|https://cwiki.apache.org/confluence/display/KAFKA/KIP-169+-+Lag-Aware+Partition+Assignment+Strategy]
with restrictions specific for KStreams as stick partitions(tasks) to valid
local state, extended by enabling/disabling some topics as function of consumer
lag (temporarily, on some stage of processing) and redistribute topics between
stream threads/consumer clients inside the same KStream instance (now it's
round-robin). Then more topics per thread / consumer means it will slow down
processing, less topics per thread / consumer will boost processing. So
switching off base fast topic will help to pre-load data in rocksdbs for side
streams in left joins then after restart continue processing with all topics
enabled. Yes, it's complex but will help to solve current issues of
disproportional processing between "slow" and "fast" streams.
I think it would be hard to make dynamic, but in my project it's acceptable. I
working on a project to load data from legacy systems using CDC. So I deal with
huge amount of events (~ one year) on initial refresh and big join windows to
support reshuffled timestamps from source system, then switching to operational
mode with small windows and fast processing.
One additional comment about the StreamsPartitionAssignor implementation and
its ugly design. I think it mixes three different behaviors which should be
segregated: a) stick partitions as tasks to local state storage. b) balance
work load between StreamThreads in the same KStreams instance. c) balance
partitions between kafka consumers proportionally. And it's done by extending
PartitionAssignor which was designed to balance consumers only. Sometimes it
works really strange, for example when it's needed to load (and filter) some
metadata from several "slow" topics with few partitions the
StreamsPartitionAssignor assigns these topics to some unlucky StreamThread
which has to retire after completing processing of these "slow" topics.
I think distribution of topics between threads should be dynamically balanced
with respect to local state storage.
{quote}Last, for KStream-GlobalKTable joins, there is no timestamp
synchronization by design.
{quote}
I've mentioned KStream-KStream joins only.
was (Author: zirx):
[~mjsax] I think it's a bug because KStreams derive settings from Consumer but
not support Consumer's contract with these settings. I think "try and fail"
approach is not the best option to find supported set of settings for KStreams.
About your question "How partition assignment could help with throttling?":
My idea was to use the same approach like in
[KIP-169|https://cwiki.apache.org/confluence/display/KAFKA/KIP-169+-+Lag-Aware+Partition+Assignment+Strategy]
with restrictions specific for KStreams as stick partitions(tasks) to valid
local state, extended by enabling/disabling some topics as function of consumer
lag (temporarily, on some stage of processing) and redistribute topics between
stream threads/consumer clients inside the same KStream instance (now it's
round-robin). Then more topics per thread / consumer means it will slow down
processing, less topics per thread / consumer will boost processing. So
switching off base fast topic will help to pre-load data in rocksdbs for side
streams in left joins then after restart continue processing with all topics
enabled. Yes, it's complex but will help to solve current issues of
disproportional processing between "slow" and "fast" streams.
I think it would be hard to make dynamic, but in my project it's acceptable. I
working on a project to load data from legacy systems using CDC. So I deal with
huge amount of events (~ one year) on initial refresh and big join windows to
support reshuffled timestamps from source system, then switching to operational
mode with small windows and fast processing.
One additional comment about the StreamsPartitionAssignor implementation and
its ugly design. I think it mixes three different behaviors which should be
segregated: a) stick partitions as tasks to local state storage. b) balance
work load between StreamThreads in the same KStreams instance. c) balance
partitions between kafka consumers proportionally. And it's done by extending
PartitionAssignor which was designed to balance consumers only. Sometimes it
works really strange, for example when it's needed to load (and filter) some
metadata from several "slow" topics with few partitions the
StreamsPartitionAssignor assigns these topics to some unlucky StreamThread
which has to retire after completing processing of these "slow" topics.
I think distribution of topics between threads should be dynamically balanced
with respect to local state storage.
> Cannot override StreamsPartitionAssignor in configuration
> ----------------------------------------------------------
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Dmitry Buykin
> Priority: Major
> Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property
> partition.assignment.strategy in KStreams 2.0.1 because the streams are
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)