[jira] [Created] (KAFKA-12387) Avoid unnecessary copy of FetchResponse data in handleFetchRequest

2021-02-27 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-12387:
--

 Summary: Avoid unnecessary copy of FetchResponse data in 
handleFetchRequest 
 Key: KAFKA-12387
 URL: https://issues.apache.org/jira/browse/KAFKA-12387
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


the inner method "processResponseCallback" and "maybeConvertFetchedData" create 
many copy from input data. After [https://github.com/apache/kafka/pull/9758] is 
merged, the data is changed to auto-generated data which is mutable. Hence, we 
can mutate input data to avoid copy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-27 Thread Ryanne Dolan
I guess I don't understand how multiple tags work together to achieve rack
awareness. I realize I could go look at how Elasticseach works, but ideally
this would be more plain in the KIP.

In particular I'm not sure how the tag approach is different than appending
multiple tags together, e.g. how is cluster=foo, zone=bar different than
rack=foo-bar?

Ryanne

On Sat, Feb 27, 2021, 5:00 PM Levani Kokhreidze 
wrote:

> Hi Bruno,
>
> Thanks for the feedback. I think it makes sense.
> I’ve updated the KIP [1] and tried to omit implementation details around
> the algorithm.
>
> Please let me know if the latest version looks OK.
>
> Regards,
> Levani
>
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>
> > On 25. Feb 2021, at 17:59, Bruno Cadonna  wrote:
> >
> > Hi Levani,
> >
> > I discussed your KIP with John the other day and we both think it is a
> really interesting KIP and you did a good job in writing it. However, we
> think that the KIP exposes to many implementation details. That makes
> future changes to the implementation of the distribution algorithm harder
> without a KIP. So, we would like to propose to just describe the config and
> the properties that any implementation of the distribution algorithm should
> have. We did something similar in KIP-441 for the task assignment algorithm
> [1].
> >
> > Specifically, for your KIP, any possible implementation of the
> distribution algorithm should read the tags to be considered for rack
> awareness from the config and if the cluster allows to distribute each
> active task and its replicas to Streams clients with different values for
> each tag, the algorithm will do so. How the implementation behaves, if a
> cluster does not allow to distribute over all tag values can be left as an
> implementation detail. This would give us flexibility for future changes to
> the distribution algorithm.
> >
> > Since there may be distribution algorithms that do not use the order of
> the tags, it would be better to not mention the order of the tags in the
> config doc. I would propose to omit the config doc from the KIP or
> formulate it really generic.
> >
> > We would also like to rename standby.replicas.awareness to
> task.assignment.rack.awareness or something that does not contain standby
> and/or replica (sorry for requesting again to change this name). That way,
> we might be able to use this config also when we decide to make the active
> task assignment rack aware.
> >
> > I hope all of this makes sense to you.
> >
> > Thank you again for the interesting KIP!
> >
> > Looking forward to your implementation!
> >
> > Best,
> > Bruno
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> >
> >
> > On 22.02.21 17:42, Levani Kokhreidze wrote:
> >> Hi Bruno,
> >> Thanks for the quick reply
> >> 5.Sorry, maybe I am not making it clear.
> >> What you have described is how it should work, yes. As it is stated in
> KIP, with the importance semantics in standby.replicas.awareness,
> >> if we have an active task on Node-1 and the first standby task on
> Node-5, the third standby should be on Node-3 or Node-6 (both of them are
> in the different AZ compared to the active and the first standby task).
> >> That flow is described in Partially Preferred Distribution section [1].
> >> Node-4 could have been a valid option IF standby.replicas.awareness
> didn’t have the importance semantics because Kafka Streams could have just
> picked Node-4 in that case.
> >> 7. Yup, will do.
> >> 8. Good question, So far assumption I had was that the configuration
> between different Kafka Streams instances is the same. Can we have an extra
> validation check to make sure that is the case?
> >> If not, what you have mentioned in point 5 always preferring the
> dimension where there’re enough KS instances is very valid.
> >> On the other hand, I thought allowing to specify the importance of the
> various dimensions may give users extra flexibility over standby task
> allocation.
> >> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution
> >> Regards,
> >> Levani
> >>> On 22. Feb 2021, at 16:51, Bruno Cadonna  wrote:
> >>>
> >>> Hi Levani,
> >>>
> >>> Thanks for the modifications!
> >>>
> >>> I have some follow up questions/comments:
> >>>
> >>> 5. Something is not clear to me. If the active is on Node-1 and the
> first replica is on Node-5 (different cluster, different zone), why would
> the second replica go to Node-4 that has a different cluster than but the
> same zone as the active instead of Node-6 which has a different zone of
> 

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-27 Thread Levani Kokhreidze
Hi Bruno,

Thanks for the feedback. I think it makes sense.
I’ve updated the KIP [1] and tried to omit implementation details around the 
algorithm.

Please let me know if the latest version looks OK.

Regards,
Levani


[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 25. Feb 2021, at 17:59, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> I discussed your KIP with John the other day and we both think it is a really 
> interesting KIP and you did a good job in writing it. However, we think that 
> the KIP exposes to many implementation details. That makes future changes to 
> the implementation of the distribution algorithm harder without a KIP. So, we 
> would like to propose to just describe the config and the properties that any 
> implementation of the distribution algorithm should have. We did something 
> similar in KIP-441 for the task assignment algorithm [1].
> 
> Specifically, for your KIP, any possible implementation of the distribution 
> algorithm should read the tags to be considered for rack awareness from the 
> config and if the cluster allows to distribute each active task and its 
> replicas to Streams clients with different values for each tag, the algorithm 
> will do so. How the implementation behaves, if a cluster does not allow to 
> distribute over all tag values can be left as an implementation detail. This 
> would give us flexibility for future changes to the distribution algorithm.
> 
> Since there may be distribution algorithms that do not use the order of the 
> tags, it would be better to not mention the order of the tags in the config 
> doc. I would propose to omit the config doc from the KIP or formulate it 
> really generic.
> 
> We would also like to rename standby.replicas.awareness to 
> task.assignment.rack.awareness or something that does not contain standby 
> and/or replica (sorry for requesting again to change this name). That way, we 
> might be able to use this config also when we decide to make the active task 
> assignment rack aware.
> 
> I hope all of this makes sense to you.
> 
> Thank you again for the interesting KIP!
> 
> Looking forward to your implementation!
> 
> Best,
> Bruno
> 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
>  
> 
> 
> On 22.02.21 17:42, Levani Kokhreidze wrote:
>> Hi Bruno,
>> Thanks for the quick reply
>> 5.Sorry, maybe I am not making it clear.
>> What you have described is how it should work, yes. As it is stated in KIP, 
>> with the importance semantics in standby.replicas.awareness,
>> if we have an active task on Node-1 and the first standby task on Node-5, 
>> the third standby should be on Node-3 or Node-6 (both of them are in the 
>> different AZ compared to the active and the first standby task).
>> That flow is described in Partially Preferred Distribution section [1].
>> Node-4 could have been a valid option IF standby.replicas.awareness didn’t 
>> have the importance semantics because Kafka Streams could have just picked 
>> Node-4 in that case.
>> 7. Yup, will do.
>> 8. Good question, So far assumption I had was that the configuration between 
>> different Kafka Streams instances is the same. Can we have an extra 
>> validation check to make sure that is the case?
>> If not, what you have mentioned in point 5 always preferring the dimension 
>> where there’re enough KS instances is very valid.
>> On the other hand, I thought allowing to specify the importance of the 
>> various dimensions may give users extra flexibility over standby task 
>> allocation.
>> [1] 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution
>> Regards,
>> Levani
>>> On 22. Feb 2021, at 16:51, Bruno Cadonna  wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thanks for the modifications!
>>> 
>>> I have some follow up questions/comments:
>>> 
>>> 5. Something is not clear to me. If the active is on Node-1 and the first 
>>> replica is on Node-5 (different cluster, different zone), why would the 
>>> second replica go to Node-4 that has a different cluster than but the same 
>>> zone as the active instead of Node-6 which has a different zone of Node-1? 
>>> In general wouldn't it be better to guarantee under Partially Preferred 
>>> task distribution to distribute active and standby replicas of the same 
>>> task over the dimension that has at least as many values as the number of 
>>> replicas + 1 and then over the dimensions that have less values? That would 
>>> then also be independent on the ordering of the tags.
>>> 
>>> 7. I agree with you. Could you add a sentence or two about this to the KIP?
>>> 
>>> New 

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

2021-02-27 Thread Walker Carlson
Sure thing Boyang,

1) it is in proposed changes. I expanded on it a bit more now.
2) done
3) and done :)

thanks for the suggestions,
walker

On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen 
wrote:

> Thanks Walker. Some minor comments:
>
> 1. Could you add a reference to localThreadMetadata method in the KIP?
> 2. Could you make the code block as a java template, such that
> TaskMetadata.java could be as the template title? Also it would be good to
> add some meta comments about the newly added functions.
> 3. Could you write more details about rejected alternatives? Just as why we
> don't choose to expose as metrics, and how a new method on KStream is not
> favorable. These would be valuable when we look back on our design
> decisions.
>
> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson 
> wrote:
>
> > I understand now. I think that is a valid concern but I think it is best
> > solved but having an external service verify through streams. As this KIP
> > is now just adding fields to TaskMetadata to be returned in the
> > threadMetadata I am going to say that is out of scope.
> >
> > That seems to be the last concern. If there are no others I will put this
> > up for a vote soon.
> >
> > walker
> >
> > On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen  >
> > wrote:
> >
> > > For the 3rd point, yes, what I'm proposing is an edge case. For
> example,
> > > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> logic
> > > causing no one gets 1_1 assigned. Then the health check service will
> only
> > > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> paying
> > > attention to 1_1. What I want to expose is a "logical global" view of
> all
> > > the tasks through the stream instance, since each instance gets the
> > > assigned topology and should be able to infer all the exact tasks to be
> > up
> > > and running when the service is healthy.
> > >
> > > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson  >
> > > wrote:
> > >
> > > > Thanks for the follow up Boyang and Guozhang,
> > > >
> > > > I have updated the kip to include these ideas.
> > > >
> > > > Guozhang, that is a good idea about using the TaskMetadata. We can
> get
> > it
> > > > through the ThreadMetadata with a minor change to
> `localThreadMetadata`
> > > in
> > > > kafkaStreams. This means that we will only need to update
> TaskMetadata
> > > and
> > > > add no other APIs
> > > >
> > > > Boyang, since each TaskMetadata contains the TaskId and
> > TopicPartitions I
> > > > don't believe mapping either way will be a problem. Also I think we
> can
> > > do
> > > > something like record the time the task started idling and when it
> > stops
> > > > idling we can override it to -1. I think that should clear up the
> first
> > > two
> > > > points.
> > > >
> > > > As for your third point I am not sure I 100% understand. The
> > > ThreadMetadata
> > > > will contain a set of all task assigned to that thread. Any health
> > check
> > > > service will just need to query all clients and aggregate their
> > responses
> > > > to get a complete picture of all tasks correct?
> > > >
> > > > walker
> > > >
> > > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > > > consolidate on the existing `TaskMetadata` since we have already
> > > > > accumulated a bunch of such classes, and its better to keep them
> > small
> > > as
> > > > > public APIs. You can see
> > > > https://issues.apache.org/jira/browse/KAFKA-12370
> > > > > for a reference and a proposal.
> > > > >
> > > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the updates Walker. Some replies and follow-up
> > questions:
> > > > > >
> > > > > > 1. I agree one task could have multiple partitions, but when we
> > hit a
> > > > > delay
> > > > > > in terms of offset progress, do we have a convenient way to
> reverse
> > > > > mapping
> > > > > > TopicPartition to the problematic task? In production, I believe
> it
> > > > would
> > > > > > be much quicker to identify the problem using task.id instead of
> > > topic
> > > > > > partition, especially when it points to an internal topic. I
> think
> > > > having
> > > > > > the task id as part of the entry value seems useful, which means
> > > > getting
> > > > > > something like Map where
> TaskProgress
> > > > > > contains both committed offsets & task id.
> > > > > >
> > > > > > 2. The task idling API was still confusing. I don't think we care
> > > about
> > > > > the
> > > > > > exact state when making tasksIdling()query, instead we care more
> > > about
> > > > > how
> > > > > > long one task has been in idle state since when you called, which
> > > > > reflects
> > > > > > whether it is a normal idling period. So I feel it might be
> helpful
> > > to
> > > > > > track that time difference and report it in the TaskStatus
> struct.
> > > > > >

[jira] [Created] (KAFKA-12386) Flush unflushed segments during recovery

2021-02-27 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12386:
---

 Summary: Flush unflushed segments during recovery
 Key: KAFKA-12386
 URL: https://issues.apache.org/jira/browse/KAFKA-12386
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
 Attachments: image-2021-02-27-09-57-50-844.png

We could strengthen the guarantees kafka provides after a restart by flushing 
segments after the recovery point after an unclean shutdown. Credit to 
[~purplefox] for the suggestion.

Full discussion:

!image-2021-02-27-09-57-50-844.png!

https://github.com/apache/kafka/pull/8812#discussion_r583496197

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12385) Remove FetchResponse#responseData

2021-02-27 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-12385:
--

 Summary: Remove FetchResponse#responseData
 Key: KAFKA-12385
 URL: https://issues.apache.org/jira/browse/KAFKA-12385
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074]

We can rewrite related code to avoid using stale data structure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-712: Shallow Mirroring

2021-02-27 Thread A S
+1 to adding latency metrics.

To add context on why CPU, memory and GC has a bigger impact than network
in a Mirror for compressed topics without KIP-712 is: *a failing / unstable
mirror cluster will have lag perpetually spiking having much larger impact
on e2e latencies*. To explain a bit more:

Less data moved:
Compressed topics "usually" should move less data over the network and are
useful to reduce the network cost / footprint of replication. Therefore,
network usage may naturally be less than if this data were uncompressed.
Instead the CPU usage bottleneck hits first due to decompression of data.
Prior to KIP-712 we had never been able to operate a mirror at wire speed.

Stability:
If there is a load spike, there can be a few scenarios played out:
- more data in a batch i.e. larger uncompressed size i.e. larger memory
footprint
- more number of batches i.e. larger memory footprint

In either case higher memory usage and more CPU cycles are used due to
this.
If the GC throughput or heap size is insufficient leads to OOMs.

Domino Effect:
Just like any Kafka Consumer, if a consumer instance in a consumer group
terminates it triggers a rebalance. In this case that rebalance happens due
to an OOM. If a Mirror instance that fails due to an OOM triggered by
traffic load (explained above) will result in a domino effect of more
Mirror instances having OOMs as the load increases due to an even smaller
number of running instances remaining in the group. Eventually leading to a
total failure of the mirror cluster.

Memory Limits & Ineffective workarounds:
A question that could be asked couldn't we configure the Mirror instance in
such a way that this doesn't happen? The answer is it's expensive and
difficult.
Let's say we are using a 4 core host with X GBs of memory and configure the
Mirror to use 4 Streams and this configuration leads to an OOM, we could
try to reduce the number of Streams to 3 or 2. That's a 25-50% loss in
efficiency i.e. we may now need 2x the number of nodes (& 2x cost) without
any guarantees that this configuration will never result in an OOM (since
future traffic characteristics are unpredictable) but it may reduce the
probability of an OOM.

Summary:
Since the root cause is memory usage due to decompression of data in
flight, the ideal way to resolve this was to eliminate the decompression of
data which isn't a hard requirement for the mirror to operate since it was
not performing any transformation or repartitioning in our case.

Thanks,
- Ambud

On Mon, Feb 22, 2021 at 9:20 AM Vahid Hashemian 
wrote:

> As Henry mentions in the KIP, we are seeing a great deal of improvements
> and efficiency by using the mirroring enhancement proposed in this KIP, and
> believe it would be equally beneficial to everyone that runs Kafka and
> Kafka Mirror at scale.
>
> I'm bumping up this thread in case there are additional feedback or
> comments.
>
> Thanks,
> --Vahid
>
> On Sat, Feb 13, 2021, 13:59 Ryanne Dolan  wrote:
>
> > Glad to hear that latency and thruput aren't negatively affected
> somehow. I
> > would love to see this KIP move forward.
> >
> > Ryanne
> >
> > On Sat, Feb 13, 2021, 3:00 PM Henry Cai  wrote:
> >
> > > Ryanne,
> > >
> > > Yes, network performance is also important.  In our deployment, we are
> > > bottlenecked on the CPU/memory on the mirror hosts.  We are using c5.2x
> > and
> > > m5.2x nodes in AWS, before the deployment, CPU would peak to 80% but
> > there
> > > is enough network bandwidth left on those hosts.  Having said that, we
> > > maintain the same network throughput before and after the switch.
> > >
> > > On Fri, Feb 12, 2021 at 12:20 PM Ryanne Dolan 
> > > wrote:
> > >
> > >> Hey Henry, great KIP. The performance improvements are impressive!
> > >> However, often cpu, ram, gc are not the metrics most important to a
> > >> replication pipeline -- often the network is mostly saturated anyway.
> Do
> > >> you know how this change affects latency or thruput? I suspect less GC
> > >> pressure means slightly less p99 latency, but it would be great to see
> > that
> > >> confirmed. I don't think it's necessary that this KIP improves these
> > >> metrics, but I think it's important to show that they at least aren't
> > made
> > >> worse.
> > >>
> > >> I suspect any improvement in MM1 would be magnified in MM2, given
> there
> > >> is a lot more machinery between consumer and producer in MM2.
> > >>
> > >>
> > >> I'd like to do some performance analysis based on these changes.
> Looking
> > >> forward to a PR!
> > >>
> > >> Ryanne
> > >>
> > >> On Wed, Feb 10, 2021, 3:50 PM Henry Cai  wrote:
> > >>
> > >>> On the question "whether shallow mirror is only applied on mirror
> maker
> > >>> v1", the code change is mostly on consumer and producer code path,
> the
> > >>> change to mirrormaker v1 is very trivial.  We chose to modify the
> > >>> consumer/producer path (instead of creating a new mirror product) so
> > other
> > >>> use cases can use that feature as well.  The