Re: [DISCUSS] KIP-501 Avoid out-of-sync or offline partitions when follower fetch requests are not processed in time

2021-06-26 Thread Dhruvil Shah
Thanks for the KIP, Satish.

I am trying to understand the problem we are looking to solve with this
KIP. When the leader is slow in processing fetch requests from the follower
(due to disk, GC, or other reasons), the primary problem is that it could
impact read and write latency and at times cause unavailability depending
on how long the leader continues to be in this state.

How does solution 1 solve the problem? It seems like it prevents followers
from being removed from the ISR but that by itself would not address the
availability problem, is that right?

- Dhruvil

On Wed, Jun 23, 2021 at 6:12 AM Ryanne Dolan  wrote:

> Satish, we encounter this frequently and consider it a major bug. Your
> solution makes sense to me.
>
> Ryanne
>
> On Tue, Jun 22, 2021, 7:29 PM Satish Duggana 
> wrote:
>
> > Hi,
> > Bumping up the discussion thread on KIP-501 about avoiding out-of-sync or
> > offline partitions when follower fetch requests are not processed in time
> > by the leader replica. This issue occurred several times in multiple
> > production environments (at Uber, Yelp, Twitter, etc).
> >
> > KIP-501 is located here
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+out-of-sync+or+offline+partitions+when+follower+fetch+requests+are+not+processed+in+time
> > >.
> > You may want to look at the earlier mail discussion thread here
> > <
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202002.mbox/%3Cpony-9f4e96e457398374499ab892281453dcaa7dc679-11722f366b06d9f46bcb5905ff94fd6ab167598e%40dev.kafka.apache.org%3E
> > >,
> > and here
> > <
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202002.mbox/%3CCAM-aUZnJ4z%2B_ztjF6sXSL61M1me0ogWZ1BV6%2BoV45rJMG8EoZA%40mail.gmail.com%3E
> > >
> > .
> >
> > Please take a look, I would like to hear your feedback and suggestions.
> >
> > Thanks,
> > Satish.
> >
>


Re: [ANNOUNCE] New Kafka PMC Member: Konstantine Karantasis

2021-06-21 Thread Dhruvil Shah
Congratulations Konstantine! Well deserved!

On Mon, Jun 21, 2021 at 10:20 AM Boyang Chen 
wrote:

> Congratulations Konstantine!
>
> On Mon, Jun 21, 2021 at 10:16 AM Matthias J. Sax  wrote:
>
> > Congrats!
> >
> > On 6/21/21 12:57 PM, Raymond Ng wrote:
> > > Congrats Konstantine!
> > >
> > > /Ray
> > >
> > > On Mon, Jun 21, 2021 at 9:45 AM Guozhang Wang 
> > wrote:
> > >
> > >> Congratulations Konstantine!
> > >>
> > >> On Mon, Jun 21, 2021 at 9:37 AM Tom Bentley 
> > wrote:
> > >>
> > >>> Congratulations Konstantine!
> > >>>
> > >>> On Mon, Jun 21, 2021 at 5:33 PM David Jacot
> >  > >>>
> > >>> wrote:
> > >>>
> >  Congrats, Konstantine. Well deserved!
> > 
> >  Best,
> >  David
> > 
> >  On Mon, Jun 21, 2021 at 6:14 PM Ramesh Krishnan <
> > >> ramesh.154...@gmail.com
> > 
> >  wrote:
> > 
> > > Congrats Konstantine
> > >
> > > On Mon, 21 Jun 2021 at 8:58 PM, Mickael Maison <
> mimai...@apache.org>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> It's my pleasure to announce that Konstantine Karantasis is now a
> > >> member of the Kafka PMC.
> > >>
> > >> Konstantine has been a Kafka committer since Feb 2020. He has
> > >>> remained
> > >> active in the community since becoming a committer.
> > >>
> > >> Congratulations Konstantine!
> > >>
> > >> Mickael, on behalf of the Apache Kafka PMC
> > >>
> > >
> > 
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
>


[jira] [Created] (KAFKA-12520) Producer state is needlessly rebuilt on startup

2021-03-22 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-12520:


 Summary: Producer state is needlessly rebuilt on startup
 Key: KAFKA-12520
 URL: https://issues.apache.org/jira/browse/KAFKA-12520
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


When we find a {{.swap}} file on startup, we typically want to rename and 
replace it as {{.log}}, {{.index}}, {{.timeindex}}, etc. as a way to complete 
any ongoing replace operations. These swap files are usually known to have been 
flushed to disk before the replace operation begins.

One flaw in the current logic is that when we recover these swap files on 
startup, we end up truncating the producer state and rebuild it from scratch. 
This is unneeded as the replace operation does not mutate the producer state by 
itself. It is only meant to replace the {{.log}} file along with corresponding 
indices.

Because of this unneeded producer state rebuild operation, we have seen 
multi-hour startup times for clusters that have large compacted topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Apache Kafka 2.8.0 release

2021-03-02 Thread Dhruvil Shah
Thanks, John. The fix for KAFKA-12254 is now merged into 2.8.

On Tue, Mar 2, 2021 at 11:54 AM John Roesler  wrote:

> Hi Dhruvil,
>
> Thanks for this fix. I agree it would be good to get it in
> for 2.8.0, so I have added it to the fix versions in KAFKA-
> 12254.
>
> Please go ahead and cherry-pick your fix onto the 2.8
> branch.
>
> Thanks!
> -John
>
> On Mon, 2021-03-01 at 09:36 -0800, Dhruvil Shah wrote:
> > Hi John,
> >
> > I would like to bring up
> https://issues.apache.org/jira/browse/KAFKA-12254
> > as a blocker candidate for 2.8.0. While this is not a regression, the
> > issue could lead to data loss in certain cases. The fix is trivial so it
> > may be worth bringing it into 2.8.0. Let me know what you think.
> >
> > - Dhruvil
> >
> > On Mon, Feb 22, 2021 at 7:50 AM John Roesler 
> wrote:
> >
> > > Thanks for the heads-up, Chia-Ping,
> > >
> > > I agree it would be good to include that fix.
> > >
> > > Thanks,
> > > John
> > >
> > > On Mon, 2021-02-22 at 09:48 +, Chia-Ping Tsai wrote:
> > > > hi John,
> > > >
> > > > There is a PR (https://github.com/apache/kafka/pull/10024) fixing
> > > following test error.
> > > >
> > > > 14:00:28 Execution failed for task ':core:test'.
> > > > 14:00:28 > Process 'Gradle Test Executor 24' finished with non-zero
> exit
> > > value 1
> > > > 14:00:28   This problem might be caused by incorrect test process
> > > configuration.
> > > > 14:00:28   Please refer to the test execution section in the User
> Manual
> > > at
> > > >
> > > > This error obstructs us from running integration tests so I'd like to
> > > push it to 2.8 branch after it gets approved.
> > > >
> > > > Best Regards,
> > > > Chia-Ping
> > > >
> > > > On 2021/02/18 16:23:13, "John Roesler"  wrote:
> > > > > Hello again, all.
> > > > >
> > > > > This is a notice that we are now in Code Freeze for the 2.8 branch.
> > > > >
> > > > > From now until the release, only fixes for blockers should be
> merged
> > > to the release branch. Fixes for failing tests are allowed and
> encouraged.
> > > Documentation-only commits are also ok, in case you have forgotten to
> > > update the docs for some features in 2.8.0.
> > > > >
> > > > > Once we have a green build and passing system tests, I will cut the
> > > first RC.
> > > > >
> > > > > Thank you,
> > > > > John
> > > > >
> > > > > On Sun, Feb 7, 2021, at 09:59, John Roesler wrote:
> > > > > > Hello all,
> > > > > >
> > > > > > I have just cut the branch for 2.8 and sent the notification
> > > > > > email to the dev mailing list.
> > > > > >
> > > > > > As a reminder, the next checkpoint toward the 2.8.0 release
> > > > > > is Code Freeze on Feb 17th.
> > > > > >
> > > > > > To ensure a high-quality release, we should now focus our
> > > > > > efforts on stabilizing the 2.8 branch, including resolving
> > > > > > failures, writing new tests, and fixing documentation.
> > > > > >
> > > > > > Thanks as always for your contributions,
> > > > > > John
> > > > > >
> > > > > >
> > > > > > On Wed, 2021-02-03 at 14:18 -0600, John Roesler wrote:
> > > > > > > Hello again, all,
> > > > > > >
> > > > > > > This is a reminder that today is the Feature Freeze
> > > > > > > deadline. To avoid any last-minute crunch or time-zone
> > > > > > > unfairness, I'll cut the branch toward the end of the week.
> > > > > > >
> > > > > > > Please wrap up your features and transition fully into a
> > > > > > > stabilization mode. The next checkpoint is Code Freeze on
> > > > > > > Feb 17th.
> > > > > > >
> > > > > > > Thanks as always for all of your contributions,
> > > > > > > John
> > > > > > >
> > > > > > > On Wed, 2021-01-27 at 12:17 -0600, John Roesler wrote:
> > > > > > > > Hello again, all.
> > > &g

Re: [DISCUSS] Apache Kafka 2.8.0 release

2021-03-01 Thread Dhruvil Shah
Hi John,

I would like to bring up https://issues.apache.org/jira/browse/KAFKA-12254
as a blocker candidate for 2.8.0. While this is not a regression, the
issue could lead to data loss in certain cases. The fix is trivial so it
may be worth bringing it into 2.8.0. Let me know what you think.

- Dhruvil

On Mon, Feb 22, 2021 at 7:50 AM John Roesler  wrote:

> Thanks for the heads-up, Chia-Ping,
>
> I agree it would be good to include that fix.
>
> Thanks,
> John
>
> On Mon, 2021-02-22 at 09:48 +, Chia-Ping Tsai wrote:
> > hi John,
> >
> > There is a PR (https://github.com/apache/kafka/pull/10024) fixing
> following test error.
> >
> > 14:00:28 Execution failed for task ':core:test'.
> > 14:00:28 > Process 'Gradle Test Executor 24' finished with non-zero exit
> value 1
> > 14:00:28   This problem might be caused by incorrect test process
> configuration.
> > 14:00:28   Please refer to the test execution section in the User Manual
> at
> >
> > This error obstructs us from running integration tests so I'd like to
> push it to 2.8 branch after it gets approved.
> >
> > Best Regards,
> > Chia-Ping
> >
> > On 2021/02/18 16:23:13, "John Roesler"  wrote:
> > > Hello again, all.
> > >
> > > This is a notice that we are now in Code Freeze for the 2.8 branch.
> > >
> > > From now until the release, only fixes for blockers should be merged
> to the release branch. Fixes for failing tests are allowed and encouraged.
> Documentation-only commits are also ok, in case you have forgotten to
> update the docs for some features in 2.8.0.
> > >
> > > Once we have a green build and passing system tests, I will cut the
> first RC.
> > >
> > > Thank you,
> > > John
> > >
> > > On Sun, Feb 7, 2021, at 09:59, John Roesler wrote:
> > > > Hello all,
> > > >
> > > > I have just cut the branch for 2.8 and sent the notification
> > > > email to the dev mailing list.
> > > >
> > > > As a reminder, the next checkpoint toward the 2.8.0 release
> > > > is Code Freeze on Feb 17th.
> > > >
> > > > To ensure a high-quality release, we should now focus our
> > > > efforts on stabilizing the 2.8 branch, including resolving
> > > > failures, writing new tests, and fixing documentation.
> > > >
> > > > Thanks as always for your contributions,
> > > > John
> > > >
> > > >
> > > > On Wed, 2021-02-03 at 14:18 -0600, John Roesler wrote:
> > > > > Hello again, all,
> > > > >
> > > > > This is a reminder that today is the Feature Freeze
> > > > > deadline. To avoid any last-minute crunch or time-zone
> > > > > unfairness, I'll cut the branch toward the end of the week.
> > > > >
> > > > > Please wrap up your features and transition fully into a
> > > > > stabilization mode. The next checkpoint is Code Freeze on
> > > > > Feb 17th.
> > > > >
> > > > > Thanks as always for all of your contributions,
> > > > > John
> > > > >
> > > > > On Wed, 2021-01-27 at 12:17 -0600, John Roesler wrote:
> > > > > > Hello again, all.
> > > > > >
> > > > > > This is a reminder that *today* is the KIP freeze for Apache
> > > > > > Kafka 2.8.0.
> > > > > >
> > > > > > The next checkpoint is the Feature Freeze on Feb 3rd.
> > > > > >
> > > > > > When considering any last-minute KIPs today, please be
> > > > > > mindful of the scope, since we have only one week to merge a
> > > > > > stable implementation of the KIP.
> > > > > >
> > > > > > For those whose KIPs have been accepted already, please work
> > > > > > closely with your reviewers so that your features can be
> > > > > > merged in a stable form in before the Feb 3rd cutoff. Also,
> > > > > > don't forget to update the documentation as part of your
> > > > > > feature.
> > > > > >
> > > > > > Finally, as a gentle reminder to all contributors. There
> > > > > > seems to have been a recent increase in test and system test
> > > > > > failures. Please take some time starting now to stabilize
> > > > > > the codebase so we can ensure a high quality and timely
> > > > > > 2.8.0 release!
> > > > > >
> > > > > > Thanks to all of you for your contributions,
> > > > > > John
> > > > > >
> > > > > > On Sat, 2021-01-23 at 18:15 +0300, Ivan Ponomarev wrote:
> > > > > > > Hi John,
> > > > > > >
> > > > > > > KIP-418 is already implemented and reviewed, but I don't see
> it in the
> > > > > > > release plan. Can it be added?
> > > > > > >
> > > > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Ivan
> > > > > > >
> > > > > > > 22.01.2021 21:49, John Roesler пишет:
> > > > > > > > Sure thing, Leah!
> > > > > > > > -John
> > > > > > > > On Thu, Jan 21, 2021, at 07:54, Leah Thomas wrote:
> > > > > > > > > Hi John,
> > > > > > > > >
> > > > > > > > > KIP-659 was just accepted as well, can it be added to the
> release plan?
> > > > > > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > > > > >
> > > > > > > 

[jira] [Created] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-01-29 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-12254:


 Summary: MirrorMaker 2.0 creates destination topic with default 
configs
 Key: KAFKA-12254
 URL: https://issues.apache.org/jira/browse/KAFKA-12254
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


`MirrorSourceConnector` implements the logic for replicating data, 
configurations, and other metadata between the source and destination clusters. 
This includes the tasks below:
 # `refreshTopicPartitions` for syncing topics / partitions from source to 
destination.
 # `syncTopicConfigs` for syncing topic configurations from source to 
destination.

A limitation is that `computeAndCreateTopicPartitions` creates topics with 
default configurations on the destination cluster. A separate async task 
`syncTopicConfigs` is responsible for syncing the topic configs. Before that 
sync happens, topic configurations could be out of sync between the two 
clusters.

In the worst case, this could lead to data loss eg. when we have a compacted 
topic being mirrored between clusters which is incorrectly created with the 
default configuration of `cleanup.policy = delete` on the destination before 
the configurations are sync'd via `syncTopicConfigs`.

Here is an example of the divergence:

Source Topic:

```

Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
cleanup.policy=compact,segment.bytes=1073741824

```

Destination Topic:

```

Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2020-09-23 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-10518:


 Summary: Consumer fetches could be inefficient when lags are 
unbalanced
 Key: KAFKA-10518
 URL: https://issues.apache.org/jira/browse/KAFKA-10518
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


Consumer fetches are inefficient when lags are imbalanced across partitions, 
due to head of the line blocking and the behavior of blocking for `max.wait.ms` 
until data is available.

When the consumer receives a fetch response, it prepares the next fetch request 
and sends it out. The caveat is that the subsequent fetch request would 
explicitly exclude partitions for which the consumer received data in the 
previous round. This is to allow the consumer application to drain the data for 
those partitions, until the consumer fetches the other partitions it is 
subscribed to.

This behavior does not play out too well if the consumer is consuming when the 
lag is unbalanced, because it would receive data for the partitions it is 
lagging on, and then it would send a fetch request for partitions that do not 
have any data (or have little data). The latter will end up blocking for 
fetch.max.wait.ms on the broker before an empty response is sent back. This 
slows down the consumer’s overall consumption throughput since 
fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10517) Inefficient consumer processing with fetch sessions

2020-09-23 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-10517:


 Summary: Inefficient consumer processing with fetch sessions
 Key: KAFKA-10517
 URL: https://issues.apache.org/jira/browse/KAFKA-10517
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


With the introduction of fetch sessions, the consumer and the broker share a 
unified view of the partitions being consumed and their current state 
(fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The 
consumer is still expected to consume in a round robin manner, however, we have 
observed certain cases where this is not the case.

Because of how we perform memory management on the consumer and implement fetch 
pipelining, we exclude partitions from a FetchRequest when they have not been 
drained by the application. This is done by adding these partitions to the 
`toForget` list in the `FetchRequest`. When partitions are added to the 
`toForget` list, the broker removes these partitions from its session cache. 
This causes bit of a divergence between the broker's and the client's view of 
the metadata.

When forgotten partitions are added back to the Fetch after the application 
have drained them, the server will immediately add them back to the session 
cache and return a response for them, even if there is no corresponding data. 
This re-triggers the behavior on the consumer to put this partition on the 
`toForget` list incorrectly, even though no data for the partition may have 
been returned.

We have seen this behavior to cause an imbalance in lags across partitions as 
the consumer no longer obeys the round-robin sequence given that the partitions 
keep shuffling between the `toForget` and `toSend` lists.

At a high level, this is caused due to the out of sync session caches on the 
consumer and broker. This ends up in a state where the partition balance is 
being maintained by external factors (such as whether metadata was returned for 
a partition), rather than following the round-robin ordering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-09-16 Thread Dhruvil Shah
Hi Satish, Harsha,

Thanks for the KIP. Few questions below:

1. Could you describe how retention would work with this KIP and which
threads are responsible for driving this work? I believe there are 3 kinds
of retention processes we are looking at:
  (a) Regular retention for data in tiered storage as per configured `
retention.ms` / `retention.bytes`.
  (b) Local retention for data in local storage as per configured `
local.log.retention.ms` / `local.log.retention.bytes`
  (c) Possibly regular retention for data in local storage, if the tiering
task is lagging or for data that is below the log start offset.

2. When does a segment become eligible to be tiered? Is it as soon as the
segment is rolled and the end offset is less than the last stable offset as
mentioned in the KIP? I wonder if we need to consider other parameters too,
like the highwatermark so that we are guaranteed that what we are tiering
has been committed to the log and accepted by the ISR.

3. The section on "Follower Fetch Scenarios" is useful but is a bit
difficult to parse at the moment. It would be useful to summarize the
changes we need in the ReplicaFetcher.

4. Related to the above, it's a bit unclear how we are planning on
restoring the producer state for a new replica. Could you expand on that?

5. Similarly, it would be worth summarizing the behavior on unclean leader
election. There are several scenarios to consider here: data loss from
local log, data loss from remote log, data loss from metadata topic, etc.
It's worth describing these in detail.

6. It would be useful to add details about how we plan on using RocksDB in
the default implementation of `RemoteLogMetadataManager`.

7. For a READ_COMMITTED FetchRequest, how do we retrieve and return the
aborted transaction metadata?

8. The `LogSegmentData` class assumes that we have a log segment, offset
index, time index, transaction index, producer snapshot and leader epoch
index. How do we deal with cases where we do not have one or more of these?
For example, we may not have a transaction index or producer snapshot for a
particular segment. The former is optional, and the latter is only kept for
up to the 3 latest segments.

Thanks,
Dhruvil

On Mon, Sep 7, 2020 at 6:54 PM Harsha Ch  wrote:

> Hi All,
>
> We are all working through the last meeting feedback. I'll cancel the
> tomorrow 's meeting and we can meanwhile continue our discussion in mailing
> list. We can start the regular meeting from next week onwards.
>
> Thanks,
>
> Harsha
>
> On Fri, Sep 04, 2020 at 8:41 AM, Satish Duggana < satish.dugg...@gmail.com
> > wrote:
>
> >
> >
> >
> > Hi Jun,
> > Thanks for your thorough review and comments. Please find the inline
> > replies below.
> >
> >
> >
> > 600. The topic deletion logic needs more details.
> > 600.1 The KIP mentions "The controller considers the topic partition is
> > deleted only when it determines that there are no log segments for that
> > topic partition by using RLMM". How is this done?
> >
> >
> >
> > It uses RLMM#listSegments() returns all the segments for the given topic
> > partition.
> >
> >
> >
> > 600.2 "If the delete option is enabled then the leader will stop RLM task
> > and stop processing and it sets all the remote log segment metadata of
> > that partition with a delete marker and publishes them to RLMM." We
> > discussed this earlier. When a topic is being deleted, there may not be a
> > leader for the deleted partition.
> >
> >
> >
> > This is a good point. As suggested in the meeting, we will add a separate
> > section for topic/partition deletion lifecycle and this scenario will be
> > addressed.
> >
> >
> >
> > 601. Unclean leader election
> > 601.1 Scenario 1: new empty follower
> > After step 1, the follower restores up to offset 3. So why does it have
> > LE-2 at offset 5?
> >
> >
> >
> > Nice catch. It was showing the leader epoch fetched from the remote
> > storage. It should be shown with the truncated till offset 3. Updated the
> > KIP.
> >
> >
> >
> > 601.2 senario 5: After Step 3, leader A has inconsistent data between its
> > local and the tiered data. For example. offset 3 has msg 3 LE-0 locally,
> > but msg 5 LE-1 in the remote store. While it's ok for the unclean leader
> > to lose data, it should still return consistent data, whether it's from
> > the local or the remote store.
> >
> >
> >
> > There is no inconsistency here as LE-0 offsets are [0, 4] and LE-2:
> > [5, ]. It will always get the right records for the given offset and
> > leader epoch. In case of remote, RSM is invoked to get the remote log
> > segment that contains the given offset with the leader epoch.
> >
> >
> >
> > 601.4 It seems that retention is based on
> > listRemoteLogSegments(TopicPartition topicPartition, long leaderEpoch).
> > When there is an unclean leader election, it's possible for the new
> leader
> > to not to include certain epochs in its epoch cache. How are remote
> > segments associated with those epochs being cleaned?
> >
> >
> >
> > 

[jira] [Created] (KAFKA-9961) Brokers may be left in an inconsistent state after reassignment

2020-05-05 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9961:
---

 Summary: Brokers may be left in an inconsistent state after 
reassignment
 Key: KAFKA-9961
 URL: https://issues.apache.org/jira/browse/KAFKA-9961
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


When completing a reassignment, the controller sends StopReplicaRequest to 
replicas that are not in the target assignment and removes them from the 
assignment in ZK. We do not have any retry mechanism to ensure that the broker 
is able to process the StopReplicaRequest successfully. Under certain 
circumstances, this could leave brokers in an inconsistent state, where they 
continue being the follower for this partition and end up with an inconsistent 
metadata cache.

We have seen messages like the following being spammed in the broker logs when 
we get into this situation:
{code:java}
While recording the replica LEO, the partition topic-1 hasn't been created.
{code}
This happens because the broker has not an updated LeaderAndIsrRequest for the 
new leader nor a StopReplicaRequest from the controller when the replica was 
removed from the assignment.

Note that we would require a restart of the affected broker to fix this 
situation. A controller failover would not fix it as the broker could continue 
being a replica for the partition until it receives a StopReplicaRequest, which 
would never happen in this case.

There seem to be couple of problems we should address:
 # We need a mechanism to retry replica deletions after partition reassignment 
is complete. The main challenge here is to be able to deal with cases where a 
broker has been decommissioned and may never come back up.
 # We could perhaps consider a mechanism to reconcile replica states across 
brokers, something similar to the solution proposed in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9956) Authorizer APIs may be invoked more than once for a given request

2020-05-05 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9956:
---

 Summary: Authorizer APIs may be invoked more than once for a given 
request
 Key: KAFKA-9956
 URL: https://issues.apache.org/jira/browse/KAFKA-9956
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


Authorizer#authorize may be invoked more than once in some cases for a given 
request. I noticed this in for `DescribeConfigsRequest` but other requests 
could be affected as well.

The reason for this is the misuse of the scala `partition` API in code like 
this:
{code:java}
val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.resources.asScala.partition { resource =>
  resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
  authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
  authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt 
for resource ${resource.name}")
  }
}
{code}
As per Scala docs, the `partition` API could traverse the collection twice, 
depending on the implementation. 
[https://www.scala-lang.org/api/current/scala/collection/Iterable.html#partition(p:A=%3EBoolean):(C,C)]

It is also not a good practice to include side effects as part of the lambda 
passed into `partition`. We should clean up such usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-584: Versioning scheme for features

2020-04-21 Thread Dhruvil Shah
Thanks for the KIP! +1 (non-binding)

On Tue, Apr 21, 2020 at 6:09 AM David Jacot  wrote:

> Great KIP, thanks! +1 (non-binding)
>
> On Fri, Apr 17, 2020 at 8:56 PM Guozhang Wang  wrote:
>
> > Thanks for the great KIP Kowshik, +1 (binding).
> >
> > On Fri, Apr 17, 2020 at 11:22 AM Jun Rao  wrote:
> >
> > > Hi, Kowshik,
> > >
> > > Thanks for the KIP. +1
> > >
> > > Jun
> > >
> > > On Thu, Apr 16, 2020 at 11:14 AM Kowshik Prakasam <
> > kpraka...@confluent.io>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a vote for KIP-584. The link to the KIP can be
> found
> > > > here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features
> > > > .
> > > >
> > > > Thanks!
> > > >
> > > >
> > > > Cheers,
> > > > Kowshik
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-584: Versioning scheme for features

2020-04-21 Thread Dhruvil Shah
Hi Kowshik,

Thanks for the KIP, this is exciting!

The KIP includes examples on how operators could use the command line
utility, etc. It would be great to add some high-level details on how the
upgrade workflow changes overall with the addition of feature versions.

- Dhruvil

On Wed, Apr 15, 2020 at 6:29 PM Kowshik Prakasam 
wrote:

> Hi Jun,
>
> Sorry the links were broken in my last response, here are the right links:
>
> 200.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioning
> Scheme For Features-Validations
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-Validations
> >
> 110.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-When
> To Use Versioned Feature Flags?
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-Whentouseversionedfeatureflags
> ?>
>
>
> Cheers,
> Kowshik
>
> On Wed, Apr 15, 2020 at 6:24 PM Kowshik Prakasam 
> wrote:
>
> >
> > Hi Jun,
> >
> > Thanks for the feedback! I have addressed the comments in the KIP.
> >
> > > 200. In the validation section, there is still the text  "*from*
> > > {"max_version_level":
> > > X} *to* {"max_version_level": X’}". It seems that it should say "from X
> > to
> > > Y"?
> >
> > (Kowshik): Done. I have reworded it a bit to make it clearer now in this
> > section:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-Validations
> >
> > > 110. Could we add that we need to document the bumped version of each
> > > feature in the upgrade section of a release?
> >
> > (Kowshik): Great point! Done, I have mentioned it in #3 this section:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-584
> > 
> > %3A+Versioning+scheme+for+features#KIP-584
> > 
> > :Versioningschemeforfeatures-Whentouseversionedfeatureflags?
> >
> >
> > Cheers,
> > Kowshik
> >
> > On Wed, Apr 15, 2020 at 4:00 PM Jun Rao  wrote:
> >
> >> Hi, Kowshik,
> >>
> >> Looks good to me now. Just a couple of minor things below.
> >>
> >> 200. In the validation section, there is still the text  "*from*
> >> {"max_version_level":
> >> X} *to* {"max_version_level": X’}". It seems that it should say "from X
> to
> >> Y"?
> >>
> >> 110. Could we add that we need to document the bumped version of each
> >> feature in the upgrade section of a release?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Apr 15, 2020 at 1:08 PM Kowshik Prakasam <
> kpraka...@confluent.io>
> >> wrote:
> >>
> >> > Hi Jun,
> >> >
> >> > Thank you for the suggestion! I have updated the KIP, please find my
> >> > response below.
> >> >
> >> > > 200. I guess you are saying only when the allowDowngrade field is
> set,
> >> > the
> >> > > finalized feature version can go backward. Otherwise, it can only go
> >> up.
> >> > > That makes sense. It would be useful to make that clear when
> >> explaining
> >> > > the usage of the allowDowngrade field. In the validation section, we
> >> > have  "
> >> > > /features' from {"max_version_level": X} to {"max_version_level":
> >> X’}",
> >> > it
> >> > > seems that we need to mention Y there.
> >> >
> >> > (Kowshik): Great point! Yes, that is correct. Done, I have updated the
> >> > validations
> >> > section explaining the above. Here is a link to this section:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-Validations
> >> >
> >> >
> >> > Cheers,
> >> > Kowshik
> >> >
> >> >
> >> >
> >> >
> >> > On Wed, Apr 15, 2020 at 11:05 AM Jun Rao  wrote:
> >> >
> >> > > Hi, Kowshik,
> >> > >
> >> > > 200. I guess you are saying only when the allowDowngrade field is
> set,
> >> > the
> >> > > finalized feature version can go backward. Otherwise, it can only go
> >> up.
> >> > > That makes sense. It would be useful to make that clear when
> >> explaining
> >> > > the usage of the allowDowngrade field. In the validation section, we
> >> have
> >> > > "
> >> > > /features' from {"max_version_level": X} to {"max_version_level":
> >> X’}",
> >> > it
> >> > > seems that we need to mention Y there.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > > On Wed, Apr 15, 2020 at 10:44 AM Kowshik Prakasam <
> >> > kpraka...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Hi Jun,
> >> > > >
> >> > > > Great question! Please find my response below.
> >> > > >
> >> > > > > 200. My understanding is that If the CLI tool passes the
> >> > > > > '--allow-downgrade' flag when updating a specific feature, then
> a
> >> > > future
> >> > > > > downgrade is possible. Otherwise, the feature is now
> >> downgradable. If
> >> 

[jira] [Created] (KAFKA-9772) Transactional offset commit fails with IllegalStateException

2020-03-26 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9772:
---

 Summary: Transactional offset commit fails with 
IllegalStateException
 Key: KAFKA-9772
 URL: https://issues.apache.org/jira/browse/KAFKA-9772
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


java.lang.IllegalStateException: Trying to complete a transactional offset 
commit for producerId 7090 and groupId application-id even though the offset 
commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-16 Thread Dhruvil Shah
Hi Colin,

That’s fair though I am unsure if a delay + metric + log message would
really serve our purpose. There would be no action required from the
operator in almost all cases. A signal that is not actionable in 99% cases
may not be very useful, in my opinion.

Additionally, if we add in a delay, we would need to reason about the
behavior when the same topic is recreated while a stray partition has been
queued for deletion.

I would be in support of adding a configuration to disable stray partition
deletion. This way, if users find abnormal behavior when testing /
upgrading development environments, they could choose to disable the
feature altogether.

Let me know what you think. It would be good to hear what others think as
well.

Thanks,
Dhruvil

On Thu, Jan 16, 2020 at 3:24 AM Colin McCabe  wrote:

> On Wed, Jan 15, 2020, at 03:54, Dhruvil Shah wrote:
> > Hi Colin,
> >
> > We could add a configuration to disable stray partition deletion if
> needed,
> > but I wasn't sure if an operator would really want to disable it. Perhaps
> > if the implementation were buggy, the configuration could be used to
> > disable the feature until a bug fix is made. Is that the kind of use case
> > you were thinking of?
> >
> > I was thinking that there would not be any delay between detection and
> > deletion of stray logs. We would schedule an async task to do the actual
> > deletion though.
>
> Based on my experience in HDFS, immediately deleting data that looks out
> of place can cause severe issues when a bug occurs.  See
> https://issues.apache.org/jira/browse/HDFS-6186 for details.  So I really
> do think there should be a delay, and a metric + log message in the
> meantime to alert the operators to what is about to happen.
>
> best,
> Colin
>
> >
> > Thanks,
> > Dhruvil
> >
> > On Tue, Jan 14, 2020 at 11:04 PM Colin McCabe 
> wrote:
> >
> > > Hi Dhruvil,
> > >
> > > Thanks for the KIP.  I think there should be some way to turn this
> off, in
> > > case that becomes necessary.  I'm also curious how long we intend to
> wait
> > > between detecting the duplication and  deleting the extra logs.  The
> KIP
> > > says "scheduled for deletion" but doesn't give a time frame -- is it
> > > assumed to be immediate?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Jan 14, 2020, at 05:56, Dhruvil Shah wrote:
> > > > If there are no more questions or concerns, I will start a vote
> thread
> > > > tomorrow.
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > > > On Mon, Jan 13, 2020 at 6:59 PM Dhruvil Shah 
> > > wrote:
> > > >
> > > > > Hi Nikhil,
> > > > >
> > > > > Thanks for looking at the KIP. The kind of race condition you
> mention
> > > is
> > > > > not possible as stray partition detection is done synchronously
> while
> > > > > handling the LeaderAndIsrRequest. In other words, we atomically
> > > evaluate
> > > > > the partitions the broker must host and the extra partitions it is
> > > hosting
> > > > > and schedule deletions based on that.
> > > > >
> > > > > One possible shortcoming of the KIP is that we do not have the
> ability
> > > to
> > > > > detect a stray partition if the topic has been recreated since. We
> will
> > > > > have the ability to disambiguate between different generations of a
> > > > > partition with KIP-516.
> > > > >
> > > > > Thanks,
> > > > > Dhruvil
> > > > >
> > > > > On Sat, Jan 11, 2020 at 11:40 AM Nikhil Bhatia <
> nik...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> Thanks Dhruvil, the proposal looks reasonable to me.
> > > > >>
> > > > >> is there a potential of a race between a new topic being assigned
> to
> > > the
> > > > >> same node that is still performing a cleanup of the stray
> partition ?
> > > > >> Topic
> > > > >> ID will definitely solve this issue.
> > > > >>
> > > > >> Thanks
> > > > >> Nikhil
> > > > >>
> > > > >> On 2020/01/06 04:30:20, Dhruvil Shah  wrote:
> > > > >> > Here is the link to the KIP:>
> > > > >> >
> > > > >>
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker
> > > > >> >
> > > > >>
> > > > >> >
> > > > >> > On Mon, Jan 6, 2020 at 9:59 AM Dhruvil Shah  >
> > > > >> wrote:>
> > > > >> >
> > > > >> > > Hi all, I would like to kick off discussion for KIP-550 which
> > > proposes
> > > > >> a>
> > > > >> > > mechanism to detect and delete stray partitions on a broker.
> > > > >> Suggestions>
> > > > >> > > and feedback are welcome.>
> > > > >> > >>
> > > > >> > > - Dhruvil>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-15 Thread Dhruvil Shah
Hi Colin,

We could add a configuration to disable stray partition deletion if needed,
but I wasn't sure if an operator would really want to disable it. Perhaps
if the implementation were buggy, the configuration could be used to
disable the feature until a bug fix is made. Is that the kind of use case
you were thinking of?

I was thinking that there would not be any delay between detection and
deletion of stray logs. We would schedule an async task to do the actual
deletion though.

Thanks,
Dhruvil

On Tue, Jan 14, 2020 at 11:04 PM Colin McCabe  wrote:

> Hi Dhruvil,
>
> Thanks for the KIP.  I think there should be some way to turn this off, in
> case that becomes necessary.  I'm also curious how long we intend to wait
> between detecting the duplication and  deleting the extra logs.  The KIP
> says "scheduled for deletion" but doesn't give a time frame -- is it
> assumed to be immediate?
>
> best,
> Colin
>
>
> On Tue, Jan 14, 2020, at 05:56, Dhruvil Shah wrote:
> > If there are no more questions or concerns, I will start a vote thread
> > tomorrow.
> >
> > Thanks,
> > Dhruvil
> >
> > On Mon, Jan 13, 2020 at 6:59 PM Dhruvil Shah 
> wrote:
> >
> > > Hi Nikhil,
> > >
> > > Thanks for looking at the KIP. The kind of race condition you mention
> is
> > > not possible as stray partition detection is done synchronously while
> > > handling the LeaderAndIsrRequest. In other words, we atomically
> evaluate
> > > the partitions the broker must host and the extra partitions it is
> hosting
> > > and schedule deletions based on that.
> > >
> > > One possible shortcoming of the KIP is that we do not have the ability
> to
> > > detect a stray partition if the topic has been recreated since. We will
> > > have the ability to disambiguate between different generations of a
> > > partition with KIP-516.
> > >
> > > Thanks,
> > > Dhruvil
> > >
> > > On Sat, Jan 11, 2020 at 11:40 AM Nikhil Bhatia 
> > > wrote:
> > >
> > >> Thanks Dhruvil, the proposal looks reasonable to me.
> > >>
> > >> is there a potential of a race between a new topic being assigned to
> the
> > >> same node that is still performing a cleanup of the stray partition ?
> > >> Topic
> > >> ID will definitely solve this issue.
> > >>
> > >> Thanks
> > >> Nikhil
> > >>
> > >> On 2020/01/06 04:30:20, Dhruvil Shah  wrote:
> > >> > Here is the link to the KIP:>
> > >> >
> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker
> > >> >
> > >>
> > >> >
> > >> > On Mon, Jan 6, 2020 at 9:59 AM Dhruvil Shah 
> > >> wrote:>
> > >> >
> > >> > > Hi all, I would like to kick off discussion for KIP-550 which
> proposes
> > >> a>
> > >> > > mechanism to detect and delete stray partitions on a broker.
> > >> Suggestions>
> > >> > > and feedback are welcome.>
> > >> > >>
> > >> > > - Dhruvil>
> > >> > >>
> > >> >
> > >>
> > >
> >
>


Re: [DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-14 Thread Dhruvil Shah
If there are no more questions or concerns, I will start a vote thread
tomorrow.

Thanks,
Dhruvil

On Mon, Jan 13, 2020 at 6:59 PM Dhruvil Shah  wrote:

> Hi Nikhil,
>
> Thanks for looking at the KIP. The kind of race condition you mention is
> not possible as stray partition detection is done synchronously while
> handling the LeaderAndIsrRequest. In other words, we atomically evaluate
> the partitions the broker must host and the extra partitions it is hosting
> and schedule deletions based on that.
>
> One possible shortcoming of the KIP is that we do not have the ability to
> detect a stray partition if the topic has been recreated since. We will
> have the ability to disambiguate between different generations of a
> partition with KIP-516.
>
> Thanks,
> Dhruvil
>
> On Sat, Jan 11, 2020 at 11:40 AM Nikhil Bhatia 
> wrote:
>
>> Thanks Dhruvil, the proposal looks reasonable to me.
>>
>> is there a potential of a race between a new topic being assigned to the
>> same node that is still performing a cleanup of the stray partition ?
>> Topic
>> ID will definitely solve this issue.
>>
>> Thanks
>> Nikhil
>>
>> On 2020/01/06 04:30:20, Dhruvil Shah  wrote:
>> > Here is the link to the KIP:>
>> >
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker
>> >
>>
>> >
>> > On Mon, Jan 6, 2020 at 9:59 AM Dhruvil Shah 
>> wrote:>
>> >
>> > > Hi all, I would like to kick off discussion for KIP-550 which proposes
>> a>
>> > > mechanism to detect and delete stray partitions on a broker.
>> Suggestions>
>> > > and feedback are welcome.>
>> > >>
>> > > - Dhruvil>
>> > >>
>> >
>>
>


Re: [DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-13 Thread Dhruvil Shah
Hi Nikhil,

Thanks for looking at the KIP. The kind of race condition you mention is
not possible as stray partition detection is done synchronously while
handling the LeaderAndIsrRequest. In other words, we atomically evaluate
the partitions the broker must host and the extra partitions it is hosting
and schedule deletions based on that.

One possible shortcoming of the KIP is that we do not have the ability to
detect a stray partition if the topic has been recreated since. We will
have the ability to disambiguate between different generations of a
partition with KIP-516.

Thanks,
Dhruvil

On Sat, Jan 11, 2020 at 11:40 AM Nikhil Bhatia  wrote:

> Thanks Dhruvil, the proposal looks reasonable to me.
>
> is there a potential of a race between a new topic being assigned to the
> same node that is still performing a cleanup of the stray partition ? Topic
> ID will definitely solve this issue.
>
> Thanks
> Nikhil
>
> On 2020/01/06 04:30:20, Dhruvil Shah  wrote:
> > Here is the link to the KIP:>
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker
> >
>
> >
> > On Mon, Jan 6, 2020 at 9:59 AM Dhruvil Shah  wrote:>
> >
> > > Hi all, I would like to kick off discussion for KIP-550 which proposes
> a>
> > > mechanism to detect and delete stray partitions on a broker.
> Suggestions>
> > > and feedback are welcome.>
> > >>
> > > - Dhruvil>
> > >>
> >
>


Re: [DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-05 Thread Dhruvil Shah
Here is the link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker

On Mon, Jan 6, 2020 at 9:59 AM Dhruvil Shah  wrote:

> Hi all, I would like to kick off discussion for KIP-550 which proposes a
> mechanism to detect and delete stray partitions on a broker. Suggestions
> and feedback are welcome.
>
> - Dhruvil
>


[DISCUSS] KIP-550: Mechanism to Delete Stray Partitions on Broker

2020-01-05 Thread Dhruvil Shah
Hi all, I would like to kick off discussion for KIP-550 which proposes a
mechanism to detect and delete stray partitions on a broker. Suggestions
and feedback are welcome.

- Dhruvil


[jira] [Created] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9307:
---

 Summary: Transaction coordinator could be left in unknown state 
after ZK session timeout
 Key: KAFKA-9307
 URL: https://issues.apache.org/jira/browse/KAFKA-9307
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Dhruvil Shah


We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
\{topic=__transaction_state,partition_states=[{... 
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
 ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
```

 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch. This meant that 
this partition was left in the "loading" state and we thus returned 
COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 

[jira] [Resolved] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment

2019-10-15 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-8125.
-
Resolution: Duplicate

> Check for topic existence in CreateTopicsRequest prior to creating replica 
> assignment
> -
>
> Key: KAFKA-8125
> URL: https://issues.apache.org/jira/browse/KAFKA-8125
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Lucas Bradstreet
>Assignee: huxihx
>Priority: Minor
>
> Imagine the following pattern to ensure topic creation in an application:
>  # Attempt to create a topic with # partitions P and replication factor R.
>  #  If topic creation fails with TopicExistsException, continue. If topic 
> creation succeeds, continue, the topic now exists.
> This normally works fine. However, if the topic has already been created, but 
> if the number of live brokers < R, then the topic creation will fail an 
> org.apache.kafka.common.errors.InvalidReplicationFactorException, even though 
> the topic already exists.
> This could be avoided if we check whether the topic exists prior to calling 
> AdminUtils.assignReplicasToBrokers.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8962) KafkaAdminClient#describeTopics always goes through the controller

2019-09-30 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-8962:
---

 Summary: KafkaAdminClient#describeTopics always goes through the 
controller
 Key: KAFKA-8962
 URL: https://issues.apache.org/jira/browse/KAFKA-8962
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


KafkaAdminClient#describeTopic makes a MetadataRequest against the controller. 
We should consider routing the request to any broker in the cluster using 
`LeastLoadedNodeProvider` instead, so that we don't overwhelm the controller 
with these requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum

2019-09-12 Thread Dhruvil Shah
This is exciting! +1 (non-binding)

- Dhruvil

On Thu, Sep 12, 2019 at 12:29 PM Bill Bejeck  wrote:

> Thanks for the KIP!
>
> +1 (binding)
>
> -Bill
>
> On Thu, Sep 12, 2019 at 3:26 PM Ismael Juma  wrote:
>
> > Thanks for the KIP, +1 (binding).
> >
> > Ismael
> >
> > On Mon, Sep 9, 2019, 8:28 AM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start the vote for KIP-500: Replace ZooKeeper with a
> > > Self-Managed Metadata Quorum.
> > >
> > > The DISCUSS thread from the mailing list is here:
> > >
> > >
> >
> https://lists.apache.org/thread.html/cce5313ebe72bde34bf0da3af5a1723db3ee871667b1fd8edf2ee7ab@%3Cdev.kafka.apache.org%3E
> > >
> > > The KIP is here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
> > >
> > > regards,
> > > Colin
> > >
> >
>


Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-07-11 Thread Dhruvil Shah
Hi Justine,

Thanks for the KIP, this is great!

Could you add some more information about what deprecating the broker
configuration means? Would we log a warning in the logs when auto topic
creation is enabled on the broker, for example?

Thanks,
Dhruvil

On Thu, Jul 11, 2019 at 10:28 AM Justine Olshan 
wrote:

> Hello all,
>
> I'd like to start a discussion thread for KIP-487.
> This KIP plans to deprecate the current system of auto-creating topics
> through requests to the metadata and give the producer the ability to
> automatically create topics instead.
>
> More information can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-487%3A+Automatic+Topic+Creation+on+Producer
>
> Thank you,
> Justine Olshan
>


[jira] [Created] (KAFKA-8570) Downconversion could fail when log contains out of order message formats

2019-06-19 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8570:
---

 Summary: Downconversion could fail when log contains out of order 
message formats
 Key: KAFKA-8570
 URL: https://issues.apache.org/jira/browse/KAFKA-8570
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


When the log contains out of order message formats (for example a v2 message 
followed by a v1 message), it is possible for down-conversion to fail in 
certain scenarios where batches compressed and greater than 1kB in size. 
Down-conversion fails with a stack like the following:

java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at 
org.apache.kafka.common.record.FileLogInputStream$FileChannelRecordBatch.writeTo(FileLogInputStream.java:176)
at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:107)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:242)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8359) Reconsider default for leader imbalance percentage

2019-05-13 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8359:
---

 Summary: Reconsider default for leader imbalance percentage
 Key: KAFKA-8359
 URL: https://issues.apache.org/jira/browse/KAFKA-8359
 Project: Kafka
  Issue Type: Improvement
Reporter: Dhruvil Shah


By default, the leader imbalance ratio is 10%. This means that the controller 
won't trigger preferred leader election for a broker unless the ratio of the 
number of partitions a broker is the current leader of and the number of 
partitions it is the preferred leader of is off by more than 10%. The problem 
is when a broker is catching up after a restart, the smallest topics tend to 
catch up first and the largest ones later, so the 10% remaining difference may 
not be proportional to the broker's load. To keep better balance in the 
cluster, we should consider setting `leader.imbalance.per.broker.percentage=0` 
by default so that the preferred leaders are always elected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-461 Improve Replica Fetcher behavior at handling partition failure

2019-05-08 Thread Dhruvil Shah
Thanks for the KIP! +1 (non-binding)

On Wed, May 8, 2019 at 10:23 PM Colin McCabe  wrote:

> +1.  Thanks, Aishwarya.
>
> Colin
>
> On Wed, May 8, 2019, at 17:50, Jason Gustafson wrote:
> > +1. Thanks!
> >
> > On Wed, May 8, 2019 at 4:30 PM Aishwarya Gune 
> > wrote:
> >
> > > Hi All!
> > >
> > > I would like to call for a vote on KIP-461 that would improve the
> behavior
> > > of replica fetcher in case of partition failure. The fetcher thread
> would
> > > just stop monitoring the crashed partition instead of terminating.
> > >
> > > Here's a link to the KIP -
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure
> > >
> > > Discussion thread -
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg97559.html
> > >
> > > --
> > > Thank you,
> > > Aishwarya
> > >
> >
>


[jira] [Created] (KAFKA-8322) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2019-05-03 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8322:
---

 Summary: Flaky test: 
SslTransportLayerTest.testListenerConfigOverride
 Key: KAFKA-8322
 URL: https://issues.apache.org/jira/browse/KAFKA-8322
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Reporter: Dhruvil Shah


java.lang.AssertionError: expected: but 
was: at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146) at 
org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
 at 
org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319)

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4250/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testListenerConfigOverride/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8185) Controller becomes stale and not able to failover the leadership for the partitions

2019-04-17 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-8185.
-
Resolution: Not A Problem

This is not a typically expected scenario and would only happen when the topic 
znode is deleted directly from ZK.

> Controller becomes stale and not able to failover the leadership for the 
> partitions
> ---
>
> Key: KAFKA-8185
> URL: https://issues.apache.org/jira/browse/KAFKA-8185
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.1
>Reporter: Kang H Lee
>Priority: Critical
> Attachments: broker12.zip, broker9.zip, zookeeper.zip
>
>
> Description:
> After broker 9 went offline, all partitions led by it went offline. The 
> controller attempted to move leadership but ran into an exception while doing 
> so:
> {code:java}
> // [2019-03-26 01:23:34,114] ERROR [PartitionStateMachine controllerId=12] 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)
> java.util.NoSuchElementException: key not found: me-test-1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartitions(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:167)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:116)
> at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:106)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onReplicasBecomeOffline(KafkaController.scala:437)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerFailure(KafkaController.scala:405)
> at 
> kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1246)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The controller was unable to move leadership of partitions led by broker 9 as 
> a result. It's worth noting that the controller ran into the same exception 
> when the broker came back up online. The controller thinks `me-test-1` is a 
> new partition and when attempting to transition it to an online partition, it 
> is unable to retrieve its replica assignment from 
> ControllerContext#partitionReplicaAssignment. I need to look through the code 
> to figure out if there's a race condition or situations where we remove the 
> partition from ControllerContext#partitionReplicaAssignment but might still 
> leave it in PartitionStateMachine#partitionState.
> They had to change the controller to recover from the offline status.
> Sequential event:
> * Broker 9 got restated in between : 2019-03-26 01:22:54,236 - 2019-03-26 
> 01:27:30,967: This was unclean shutdown.
> * From 2019-03-26 01:27:30,967, broker 9 was rebuilding indexes. Broker 9 
> wasn't able to process data at this moment.
> * At 2019-03-26 01:29:36,741, broker 9 was starting to load replica.
> * [2019-03-26 01:29:36,202] ERROR [KafkaApi-9] Number of alive brokers '0' 
> does not meet the required r

Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-03-28 Thread Dhruvil Shah
Thanks for the KIP, Viktor! This is a useful addition. +1 overall.

Minor nits:
> I propose to add three gauge: DeadFetcherThreadCount for the fetcher
threads, log-cleaner-dead-thread-count for the log cleaner.
I think you meant two instead of three.

Also, would it make sense to name these metrics consistency, something like
`log-cleaner-dead-thread-count` and `replica-fetcher-dead-thread-count`?

Thanks,
Dhruvil

On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
viktorsomo...@gmail.com> wrote:

> Hi All,
>
> I'd like to start a vote on KIP-434.
> This basically would add a metrics to count dead threads in
> ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
> based on this.
>
> The KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> The
> PR: https://github.com/apache/kafka/pull/6514
>
> I'd be happy to receive any votes or additional feedback/reviews too.
>
> Thanks,
> Viktor
>


Re: problems in Kafka unit testing trunk

2018-11-27 Thread Dhruvil Shah
The unit test itself does not seem to use too many files. What is the
output for `ulimit -n` on your system? Running `lsof` might also be helpful
to determine how many open files you have while Kafka is not running.

- Dhruvil

On Tue, Nov 27, 2018 at 9:20 AM lk gen  wrote:

> When running ./gradlew test
> on a centos machine with gradle and java set
> In the trunk version from today
>
> There are errors about too many files open of the form
> "
> kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic FAILED
> org.apache.kafka.common.KafkaException: java.io.IOException: Too many
> open files
> at
> org.apache.kafka.common.network.Selector.(Selector.java:160)
> at
> org.apache.kafka.common.network.Selector.(Selector.java:212)
> at
> org.apache.kafka.common.network.Selector.(Selector.java:225)
> at
>
> kafka.coordinator.transaction.TransactionMarkerChannelManager$.apply(TransactionMarkerChannelManager.scala:66)
> at
>
> kafka.coordinator.transaction.TransactionCoordinator$.apply(TransactionCoordinator.scala:62)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:279)
> at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
> at
>
> kafka.admin.DeleteTopicTest.$anonfun$createTestTopicAndCluster$2(DeleteTopicTest.scala:372)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> kafka.admin.DeleteTopicTest.createTestTopicAndCluster(DeleteTopicTest.scala:372)
> at
>
> kafka.admin.DeleteTopicTest.createTestTopicAndCluster(DeleteTopicTest.scala:366)
> at
>
> kafka.admin.DeleteTopicTest.testDeletingPartiallyDeletedTopic(DeleteTopicTest.scala:418)
>
> Caused by:
> java.io.IOException: Too many open files
> at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
> at
> sun.nio.ch.EPollArrayWrapper.(EPollArrayWrapper.java:130)
> at
> sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:69)
> at
> sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.apache.kafka.common.network.Selector.(Selector.java:158)
> ... 20 more
>
> "
>
> Is the environment I am using for gradle test is invalid ? are there
> special settings required ?
>


Re: Help on 'Error while writing to checkpoint file' Issue

2018-10-30 Thread Dhruvil Shah
Hi Dasun, seems like the screenshots were not attached. Could you please
open a Jira here: https://issues.apache.org/jira/projects/KAFKA

Thanks,
Dhruvil

On Tue, Oct 30, 2018 at 10:29 PM Dasun Nirmitha 
wrote:

> Hello Guys
> I'm currently testing a Java Kafka producer application coded to retrieve
> a db value from a local mysql db and produce to a single topic. Locally
> I've got a Zookeeper server and a Kafka single broker running.
> My issue is I need to produce this from the Kafka producer each second,
> and that works for around 2 hours until broker throws an 'Error while
> writing to checkpoint file' and shuts down. Producing with a 1 minute
> interval works with no issues but unfortunately I need the produce interval
> to be 1 second.
> I have attached screenshots of the Errors thrown from the Broker and my
> application.
> Any help would be really appreciated.
>
> Best Regards
> Dasun
>


Re: [ANNOUNCE] New committer: Colin McCabe

2018-09-25 Thread Dhruvil Shah
Congratulations Colin!

On Tue, Sep 25, 2018 at 6:47 AM Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Congrats Colin !
>
> On Tue., 25 Sep. 2018, 3:33 pm Bill Bejeck,  wrote:
>
> > Congrats Colin!
> >
> > On Tue, Sep 25, 2018 at 8:11 AM Manikumar 
> > wrote:
> >
> > > Congrats Colin!
> > >
> > > On Tue, Sep 25, 2018 at 4:39 PM Mickael Maison <
> mickael.mai...@gmail.com
> > >
> > > wrote:
> > >
> > > > Congratulations Colin!
> > > >
> > > > On Tue, Sep 25, 2018, 11:17 Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Congrats Colin, well deserved! :)
> > > > >
> > > > > On Tue, Sep 25, 2018 at 10:58 AM Stanislav Kozlovski <
> > > > > stanis...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Congrats Colin!
> > > > > >
> > > > > > On Tue, Sep 25, 2018 at 9:51 AM Edoardo Comar  >
> > > > wrote:
> > > > > >
> > > > > > > Congratulations Colin !
> > > > > > > --
> > > > > > >
> > > > > > > Edoardo Comar
> > > > > > >
> > > > > > > IBM Event Streams
> > > > > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > From:   Ismael Juma 
> > > > > > > To: Kafka Users , dev <
> > > > > dev@kafka.apache.org>
> > > > > > > Date:   25/09/2018 09:40
> > > > > > > Subject:[ANNOUNCE] New committer: Colin McCabe
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > The PMC for Apache Kafka has invited Colin McCabe as a
> committer
> > > and
> > > > we
> > > > > > > are
> > > > > > > pleased to announce that he has accepted!
> > > > > > >
> > > > > > > Colin has contributed 101 commits and 8 KIPs including
> > significant
> > > > > > > improvements to replication, clients, code quality and
> testing. A
> > > few
> > > > > > > highlights were KIP-97 (Improved Clients Compatibility Policy),
> > > > KIP-117
> > > > > > > (AdminClient), KIP-227 (Incremental FetchRequests to Increase
> > > > Partition
> > > > > > > Scalability), the introduction of findBugs and adding Trogdor
> > > (fault
> > > > > > > injection and benchmarking tool).
> > > > > > >
> > > > > > > In addition, Colin has reviewed 38 pull requests and
> participated
> > > in
> > > > > more
> > > > > > > than 50 KIP discussions.
> > > > > > >
> > > > > > > Thank you for your contributions Colin! Looking forward to many
> > > more.
> > > > > :)
> > > > > > >
> > > > > > > Ismael, for the Apache Kafka PMC
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Unless stated otherwise above:
> > > > > > > IBM United Kingdom Limited - Registered in England and Wales
> with
> > > > > number
> > > > > > > 741598.
> > > > > > > Registered office: PO Box 41, North Harbour, Portsmouth,
> > Hampshire
> > > > PO6
> > > > > > 3AU
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Stanislav
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-09-17 Thread Dhruvil Shah
Thank you for the votes and discussion, everyone. The KIP has passed with 3
binding votes (Ismael, Gwen, Matthias) and 5 non-binding votes (Brandon,
Bill, Manikumar, Colin, Mickael).

- Dhruvil

On Mon, Sep 17, 2018 at 1:59 AM Mickael Maison 
wrote:

> +1 (non-binding)
> Thanks for the KIP!
> On Sun, Sep 16, 2018 at 7:40 PM Matthias J. Sax 
> wrote:
> >
> > +1 (binding)
> >
> > -Matthias
> >
> > On 9/14/18 4:57 PM, Ismael Juma wrote:
> > > Thanks for the KIP, +1 (binding).
> > >
> > > Ismael
> > >
> > > On Fri, Sep 14, 2018 at 4:56 PM Dhruvil Shah 
> wrote:
> > >
> > >> Hi all,
> > >>
> > >> I would like to start a vote on KIP-361.
> > >>
> > >> Link to the KIP:
> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation
> > >>
> > >> Thanks,
> > >> Dhruvil
> > >>
> > >
> >
>


[VOTE] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-09-14 Thread Dhruvil Shah
Hi all,

I would like to start a vote on KIP-361.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation

Thanks,
Dhruvil


Re: [DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-09-13 Thread Dhruvil Shah
Hi all,

I updated the KIP with the discussion from this thread. I left the warning
message and deprecation out for now because we require the configuration be
set to true (i.e. auto topic creation allowed) when using brokers older
than 0.11.0.

If there is no more feedback, I will start a VOTE thread tomorrow.

Thanks,
Dhruvil

On Thu, Sep 6, 2018 at 9:52 PM Matthias J. Sax 
wrote:

> What is the status of this KIP?
>
> I think you can start a VOTE Dhruvil.
>
>
> -Matthias
>
> On 8/23/18 9:52 AM, Ismael Juma wrote:
> > Yeah, the reason why we want to deprecate the auto create functionality
> is
> > that it happens when a metadata request is done instead of when a write
> > operation happens. So, there's no reason to differentiate between the
> two.
> >
> > Ismael
> >
> > On Thu, Aug 23, 2018 at 8:16 AM Andrew Otto  wrote:
> >
> >> Ah, I just realized that as proposed this is only for the Java consumer
> >> client, correct?  Would it be possible to make this a broker config,
> like
> >> the current one?  Something like:
> >>
> >> auto.create.topics.enable=true # allow both producer and consumer to
> create
> >> auto.create.topics.enable=consumer # allow only consumer to create
> >> auto.create.topics.enable=producer # allow only producer to create
> >> auto.create.topics.enable=false # deny any auto topic creation
> >>
> >> Perhaps the broker doesn’t differentiate between the type of client
> >> connection. If not, I guess this wouldn’t be possible.
> >>
> >>
> >>
> >> On Thu, Aug 23, 2018 at 11:08 AM Andrew Otto 
> wrote:
> >>
> >>> Yup :)
> >>>
> >>> On Thu, Aug 23, 2018 at 11:04 AM Ismael Juma 
> wrote:
> >>>
> >>>> Andrew, one question: you are relying on auto topic creation for the
> >>>> producer and that's why you can't just disable it?
> >>>>
> >>>> On Thu, Aug 23, 2018 at 8:01 AM Ismael Juma 
> wrote:
> >>>>
> >>>>> Thanks for sharing Andrew!
> >>>>>
> >>>>> Ismael
> >>>>>
> >>>>> On Thu, Aug 23, 2018 at 7:57 AM Andrew Otto 
> >> wrote:
> >>>>>
> >>>>>> We recently had a pretty serious Kafka outage
> >>>>>> <
> >>>>>>
> >>>>
> >>
> https://wikitech.wikimedia.org/wiki/Incident_documentation/20180711-kafka-eqiad#Summary
> >>>>>>>
> >>>>>> caused by a bug in one of our consumers that caused it to create new
> >>>>>> topics
> >>>>>> in an infinite loop AKA a topic bomb!  Having consumers restricted
> >> from
> >>>>>> creating topics would have prevented this for us.
> >>>>>>
> >>>>>> On Thu, Aug 23, 2018 at 4:27 AM Ismael Juma 
> >> wrote:
> >>>>>>
> >>>>>>> Generally, I think positive configs (`allow` instead of `suppress`)
> >>>> are
> >>>>>>> easier to understand.
> >>>>>>>
> >>>>>>> Ismael
> >>>>>>>
> >>>>>>> On Wed, Aug 22, 2018 at 11:05 PM Matthias J. Sax <
> >>>> matth...@confluent.io
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the summary!
> >>>>>>>>
> >>>>>>>> We might want to add a diagram/table to the docs when we add this
> >>>>>>>> feature (with whatever config name we choose) to explain how
> >> broker
> >>>>>>>> config `auto.create.topics.enable` and the consumer config work
> >>>>>> together.
> >>>>>>>>
> >>>>>>>> I think both options are equally easy to understand. "allow"
> >> means
> >>>>>>>> follow the broker config, while "suppress" implies ignore the
> >>>> broker
> >>>>>>>> config and don't auto-create.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 8/22/18 10:36 PM, Dhruvil Shah wrote:
> >>>>>>>>> *"suppress" is the opposite of &quo

[jira] [Created] (KAFKA-7385) Log compactor crashes when empty headers are retained with idempotent / transaction producers

2018-09-07 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7385:
---

 Summary: Log compactor crashes when empty headers are retained 
with idempotent / transaction producers
 Key: KAFKA-7385
 URL: https://issues.apache.org/jira/browse/KAFKA-7385
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


During log compaction, we retain an empty header if the batch contains the last 
sequence number for a particular producer. When such headers are the only 
messages retained, we do not update state such as `maxOffset` in 
`MemoryRecords#filterTo` causing us to append these into the cleaned segment 
with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
for a segment that does not actually have an overflow. When we attempt to split 
the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-08-22 Thread Dhruvil Shah
*"suppress" is the opposite of "allow", so
setting suppress.auto.create.topics=false would mean that we do _not_ allow
auto topic creation; when set to true, the server configuration will
determine whether we allow automatic creation or not.*

Sorry, I meant suppress.auto.create.topics=true above to disallow auto
topic creation.


On Wed, Aug 22, 2018 at 10:34 PM Dhruvil Shah  wrote:

> To be clear, we will allow auto topic creation only when server config
> auto.create.topics.enable=true and consumer config
> allow.auto.create.topics=true; when either is false, we would not create
> the topic if it does not exist.
>
> "suppress" is the opposite of "allow", so
> setting suppress.auto.create.topics=false would mean that we do _not_ allow
> auto topic creation; when set to true, the server configuration will
> determine whether we allow automatic creation or not.
>
> I think "allow" is easier to understand but I am open to suggestions.
>
> - Dhruvil
>
> On Wed, Aug 22, 2018 at 6:53 PM Brandon Kirchner <
> brandon.kirch...@gmail.com> wrote:
>
>> “allow=false” seems a bit more intuitive to me than “suppress=false”
>>
>> Brandon
>>
>> > On Aug 22, 2018, at 8:48 PM, Ted Yu  wrote:
>> >
>> > We may also consider :
>> >
>> > "suppress.auto.topic.creation"
>> >
>> > or
>> >
>> > "allow.auto.topic.creation"
>> >
>> > w.r.t. suppress or allow, I don't have strong opinion either. It's just
>> a
>> > matter of choosing the proper default value.
>> >
>> > Cheers
>> >
>> >> On Wed, Aug 22, 2018 at 6:00 PM Dhruvil Shah 
>> wrote:
>> >>
>> >> Hi Matthias,
>> >>
>> >> Do you mean something like "suppress.auto.create.topic"? I am leaning
>> a bit
>> >> towards "allow.auto.create.topics" but I don't have a strong preference
>> >> either. Let's wait to hear if anyone else has an opinion on this.
>> >>
>> >> Thanks,
>> >> Dhruvil
>> >>
>> >> On Tue, Aug 21, 2018 at 5:28 PM Matthias J. Sax > >
>> >> wrote:
>> >>
>> >>> Thanks for the KIP Dhruvil!
>> >>>
>> >>> I agree with Jason's comment. An alternative might be to use
>> "suppress"
>> >>> what would revert the logic of "allow". Not sure which one is more
>> >>> intuitive and I am fine with both (no personal preference). Just
>> wanted
>> >>> to mention it as an alternative.
>> >>>
>> >>> Don't have any further comments/question so far.
>> >>>
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>>
>> >>>> On 8/21/18 4:42 PM, Jason Gustafson wrote:
>> >>>> Hey Dhruvil,
>> >>>>
>> >>>> I would suggest using the verb "allow" rather than "enable. The
>> >> consumer
>> >>>> cannot enable auto topic creation because it is configured on the
>> >> broker.
>> >>>> All it can do is prevent it from happening if it is enabled.
>> >>>>
>> >>>> -Jason
>> >>>>
>> >>>> On Tue, Aug 21, 2018 at 3:56 PM, Dhruvil Shah 
>> >>> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> I would like to start discussion on KIP-361 that proposes we add a
>> >>> consumer
>> >>>>> configuration to disable auto topic creation.
>> >>>>>
>> >>>>> Link to the KIP:
>> >>>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+
>> >>>>> Configuration+to+Disable+Auto+Topic+Creation
>> >>>>>
>> >>>>> Suggestions and feedback are welcome!
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Dhruvil
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>
>>
>


Re: [DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-08-22 Thread Dhruvil Shah
To be clear, we will allow auto topic creation only when server config
auto.create.topics.enable=true and consumer config
allow.auto.create.topics=true; when either is false, we would not create
the topic if it does not exist.

"suppress" is the opposite of "allow", so
setting suppress.auto.create.topics=false would mean that we do _not_ allow
auto topic creation; when set to true, the server configuration will
determine whether we allow automatic creation or not.

I think "allow" is easier to understand but I am open to suggestions.

- Dhruvil

On Wed, Aug 22, 2018 at 6:53 PM Brandon Kirchner 
wrote:

> “allow=false” seems a bit more intuitive to me than “suppress=false”
>
> Brandon
>
> > On Aug 22, 2018, at 8:48 PM, Ted Yu  wrote:
> >
> > We may also consider :
> >
> > "suppress.auto.topic.creation"
> >
> > or
> >
> > "allow.auto.topic.creation"
> >
> > w.r.t. suppress or allow, I don't have strong opinion either. It's just a
> > matter of choosing the proper default value.
> >
> > Cheers
> >
> >> On Wed, Aug 22, 2018 at 6:00 PM Dhruvil Shah 
> wrote:
> >>
> >> Hi Matthias,
> >>
> >> Do you mean something like "suppress.auto.create.topic"? I am leaning a
> bit
> >> towards "allow.auto.create.topics" but I don't have a strong preference
> >> either. Let's wait to hear if anyone else has an opinion on this.
> >>
> >> Thanks,
> >> Dhruvil
> >>
> >> On Tue, Aug 21, 2018 at 5:28 PM Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks for the KIP Dhruvil!
> >>>
> >>> I agree with Jason's comment. An alternative might be to use "suppress"
> >>> what would revert the logic of "allow". Not sure which one is more
> >>> intuitive and I am fine with both (no personal preference). Just wanted
> >>> to mention it as an alternative.
> >>>
> >>> Don't have any further comments/question so far.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>>> On 8/21/18 4:42 PM, Jason Gustafson wrote:
> >>>> Hey Dhruvil,
> >>>>
> >>>> I would suggest using the verb "allow" rather than "enable. The
> >> consumer
> >>>> cannot enable auto topic creation because it is configured on the
> >> broker.
> >>>> All it can do is prevent it from happening if it is enabled.
> >>>>
> >>>> -Jason
> >>>>
> >>>> On Tue, Aug 21, 2018 at 3:56 PM, Dhruvil Shah 
> >>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I would like to start discussion on KIP-361 that proposes we add a
> >>> consumer
> >>>>> configuration to disable auto topic creation.
> >>>>>
> >>>>> Link to the KIP:
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+
> >>>>> Configuration+to+Disable+Auto+Topic+Creation
> >>>>>
> >>>>> Suggestions and feedback are welcome!
> >>>>>
> >>>>> Thanks,
> >>>>> Dhruvil
> >>>>>
> >>>>
> >>>
> >>>
> >>
>


Re: [DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-08-22 Thread Dhruvil Shah
Hi Ismael,

Thanks for the comments. Replies below.

1. We could throw an InvalidConfigurationException at run-time either when
building the MetadataRequest or when an ApiVersions response is received.
Because this is a configuration problem, I think users would likely see the
exception on the first call to poll. Does this sound reasonable?

2. I think we could log a warning when KafkaConsumer is instantiated with
the default configuration. Is there a plan to deprecate the server
configuration? Is so, we could employ the same strategy in terms of when to
switch the default value and then eventually remove the config altogether.

- Dhruvil

On Tue, Aug 21, 2018 at 7:36 PM Ismael Juma  wrote:

> Thanks for the KIP. A few questions/comments:
>
> 1. It seems hard to reason about if we just disregard the config for older
> brokers. Maybe we should throw an error if the brokers don't support it and
> let users explicitly change the config if they want to.
>
> 2. We probably want to switch the default and eventually remove this config
> in a future version. What's the path to making that happen? One option
> would be to warn if people rely on the default as a first step (or warn
> every time it's used).
>
> Ismael
>
> On 21 Aug 2018 3:56 pm, "Dhruvil Shah"  wrote:
>
> Hi,
>
> I would like to start discussion on KIP-361 that proposes we add a consumer
> configuration to disable auto topic creation.
>
> Link to the KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation
>
> Suggestions and feedback are welcome!
>
> Thanks,
> Dhruvil
>


Re: [DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-08-22 Thread Dhruvil Shah
Hi Matthias,

Do you mean something like "suppress.auto.create.topic"? I am leaning a bit
towards "allow.auto.create.topics" but I don't have a strong preference
either. Let's wait to hear if anyone else has an opinion on this.

Thanks,
Dhruvil

On Tue, Aug 21, 2018 at 5:28 PM Matthias J. Sax 
wrote:

> Thanks for the KIP Dhruvil!
>
> I agree with Jason's comment. An alternative might be to use "suppress"
> what would revert the logic of "allow". Not sure which one is more
> intuitive and I am fine with both (no personal preference). Just wanted
> to mention it as an alternative.
>
> Don't have any further comments/question so far.
>
>
> -Matthias
>
>
>
> On 8/21/18 4:42 PM, Jason Gustafson wrote:
> > Hey Dhruvil,
> >
> > I would suggest using the verb "allow" rather than "enable. The consumer
> > cannot enable auto topic creation because it is configured on the broker.
> > All it can do is prevent it from happening if it is enabled.
> >
> > -Jason
> >
> > On Tue, Aug 21, 2018 at 3:56 PM, Dhruvil Shah 
> wrote:
> >
> >> Hi,
> >>
> >> I would like to start discussion on KIP-361 that proposes we add a
> consumer
> >> configuration to disable auto topic creation.
> >>
> >> Link to the KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+
> >> Configuration+to+Disable+Auto+Topic+Creation
> >>
> >> Suggestions and feedback are welcome!
> >>
> >> Thanks,
> >> Dhruvil
> >>
> >
>
>


Re: [ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-21 Thread Dhruvil Shah
Congratulations, Dong!

On Tue, Aug 21, 2018 at 4:38 PM Jason Gustafson  wrote:

> Congrats!
>
> On Tue, Aug 21, 2018 at 10:03 AM, Ray Chiang  wrote:
>
> > Congrats Dong!
> >
> > -Ray
> >
> >
> > On 8/21/18 9:33 AM, Becket Qin wrote:
> >
> >> Congrats, Dong!
> >>
> >> On Aug 21, 2018, at 11:03 PM, Eno Thereska 
> >>> wrote:
> >>>
> >>> Congrats Dong!
> >>>
> >>> Eno
> >>>
> >>> On Tue, Aug 21, 2018 at 7:05 AM, Ted Yu  wrote:
> >>>
> >>> Congratulation Dong!
> 
>  On Tue, Aug 21, 2018 at 1:59 AM Viktor Somogyi-Vass <
>  viktorsomo...@gmail.com>
>  wrote:
> 
>  Congrats Dong! :)
> >
> > On Tue, Aug 21, 2018 at 10:09 AM James Cheng 
> >
>  wrote:
> 
> > Congrats Dong!
> >>
> >> -James
> >>
> >> On Aug 20, 2018, at 3:54 AM, Ismael Juma  wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> Dong Lin became a committer in March 2018. Since then, he has
> >>>
> >> remained
> 
> > active in the community and contributed a number of patches, reviewed
> >>> several pull requests and participated in numerous KIP
> discussions. I
> >>>
> >> am
> >
> >> happy to announce that Dong is now a member of the
> >>> Apache Kafka PMC.
> >>>
> >>> Congratulation Dong! Looking forward to your future contributions.
> >>>
> >>> Ismael, on behalf of the Apache Kafka PMC
> >>>
> >>
> >>
> >
>


[DISCUSS] KIP-361: Add Consumer Configuration to Disable Auto Topic Creation

2018-08-21 Thread Dhruvil Shah
Hi,

I would like to start discussion on KIP-361 that proposes we add a consumer
configuration to disable auto topic creation.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation

Suggestions and feedback are welcome!

Thanks,
Dhruvil


[jira] [Created] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2018-08-21 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7320:
---

 Summary: Provide ability to disable auto topic creation in 
KafkaConsumer
 Key: KAFKA-7320
 URL: https://issues.apache.org/jira/browse/KAFKA-7320
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


Consumers should have a configuration to control whether subscribing to 
non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-346 - Improve LogCleaner behavior on error

2018-08-13 Thread Dhruvil Shah
Thanks for the KIP, Stanislav! +1 (non-binding)

- Dhruvil

On Mon, Aug 13, 2018 at 9:39 AM Colin McCabe  wrote:

> +1 (non-binding)
>
> best,
> Colin
>
> On Tue, Aug 7, 2018, at 04:19, Stanislav Kozlovski wrote:
> > Hey everybody,
> > I'm starting a vote on KIP-346
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error
> >
> >
> > --
> > Best,
> > Stanislav
>


Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-07-25 Thread Dhruvil Shah
For the cleaner thread specifically, I do not think respawning will help at
all because we are more than likely to run into the same issue again which
would end up crashing the cleaner. Retrying makes sense for transient
errors or when you believe some part of the system could have healed
itself, both of which I think are not true for the log cleaner.

On Wed, Jul 25, 2018 at 11:08 AM Ron Dagostino  wrote:

> << infinite loop which consumes resources and fires off continuous log
> messages.
> Hi Colin.  In case it could be relevant, one way to mitigate this effect is
> to implement a backoff mechanism (if a second respawn is to occur then wait
> for 1 minute before doing it; then if a third respawn is to occur wait for
> 2 minutes before doing it; then 4 minutes, 8 minutes, etc. up to some max
> wait time).
>
> I have no opinion on whether respawn is appropriate or not in this context,
> but a mitigation like the increasing backoff described above may be
> relevant in weighing the pros and cons.
>
> Ron
>
> On Wed, Jul 25, 2018 at 1:26 PM Colin McCabe  wrote:
>
> > On Mon, Jul 23, 2018, at 23:20, James Cheng wrote:
> > > Hi Stanislav! Thanks for this KIP!
> > >
> > > I agree that it would be good if the LogCleaner were more tolerant of
> > > errors. Currently, as you said, once it dies, it stays dead.
> > >
> > > Things are better now than they used to be. We have the metric
> > >   kafka.log:type=LogCleanerManager,name=time-since-last-run-ms
> > > which we can use to tell us if the threads are dead. And as of 1.1.0,
> we
> > > have KIP-226, which allows you to restart the log cleaner thread,
> > > without requiring a broker restart.
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
> >
> >
> > > I've only read about this, I haven't personally tried it.
> >
> > Thanks for pointing this out, James!  Stanislav, we should probably add a
> > sentence or two mentioning the KIP-226 changes somewhere in the KIP.
> Maybe
> > in the intro section?
> >
> > I think it's clear that requiring the users to manually restart the log
> > cleaner is not a very good solution.  But it's good to know that it's a
> > possibility on some older releases.
> >
> > >
> > > Some comments:
> > > * I like the idea of having the log cleaner continue to clean as many
> > > partitions as it can, skipping over the problematic ones if possible.
> > >
> > > * If the log cleaner thread dies, I think it should automatically be
> > > revived. Your KIP attempts to do that by catching exceptions during
> > > execution, but I think we should go all the way and make sure that a
> new
> > > one gets created, if the thread ever dies.
> >
> > This is inconsistent with the way the rest of Kafka works.  We don't
> > automatically re-create other threads in the broker if they terminate.
> In
> > general, if there is a serious bug in the code, respawning threads is
> > likely to make things worse, by putting you in an infinite loop which
> > consumes resources and fires off continuous log messages.
> >
> > >
> > > * It might be worth trying to re-clean the uncleanable partitions. I've
> > > seen cases where an uncleanable partition later became cleanable. I
> > > unfortunately don't remember how that happened, but I remember being
> > > surprised when I discovered it. It might have been something like a
> > > follower was uncleanable but after a leader election happened, the log
> > > truncated and it was then cleanable again. I'm not sure.
> >
> > James, I disagree.  We had this behavior in the Hadoop Distributed File
> > System (HDFS) and it was a constant source of user problems.
> >
> > What would happen is disks would just go bad over time.  The DataNode
> > would notice this and take them offline.  But then, due to some
> > "optimistic" code, the DataNode would periodically try to re-add them to
> > the system.  Then one of two things would happen: the disk would just
> fail
> > immediately again, or it would appear to work and then fail after a short
> > amount of time.
> >
> > The way the disk failed was normally having an I/O request take a really
> > long time and time out.  So a bunch of request handler threads would
> > basically slam into a brick wall when they tried to access the bad disk,
> > slowing the DataNode to a crawl.  It was even worse in the second
> scenario,
> > if the disk appeared to work for a while, but then failed.  Any data that
> > had been written on that DataNode to that disk would be lost, and we
> would
> > need to re-replicate it.
> >
> > Disks aren't biological systems-- they don't heal over time.  Once
> they're
> > bad, they stay bad.  The log cleaner needs to be robust against cases
> where
> > the disk really is failing, and really is returning bad data or timing
> out.
> >
> > >
> > > * For your metrics, can you spell out the full metric in JMX-style
> > > 

Re: [DISCUSS] KIP-289: Improve the default group id behavior in KafkaConsumer

2018-07-20 Thread Dhruvil Shah
If we are looking into deprecating the empty group id, would it also make
sense to have the same character restriction for it as that for topic
names? We have stricter validation for topic names but none for group id
and transaction id. I think we should (eventually) make character
restriction the same across all entities. We may not necessarily want to do
this as part of the proposed KIP but I wanted to get an opinion on it
anyway.

On Fri, Jul 20, 2018 at 10:22 AM Rajini Sivaram 
wrote:

> +1 to deprecate use of "" as group.id since it is odd to have a resource
> name that you cannot set ACLs for. Agree, we have to support older clients
> though.
>
> Thanks,
>
> Rajini
>
> On Fri, Jul 20, 2018 at 5:25 PM, Jason Gustafson 
> wrote:
>
> > Hi Vahid,
> >
> > Sorry for getting to this so late. I think there are two things here:
> >
> > 1. The use of "" as a groupId has always been a dubious practice at best.
> > We definitely ought to deprecate its use in the client. Perhaps in the
> next
> > major release, we can remove support completely. However, since older
> > clients depend on it, we may have to continue letting the broker support
> it
> > to some extent. Perhaps we just need to bump the OffsetCommit request API
> > and only accept the offset commit for older versions. You probably have
> to
> > do this anyway if you want to introduce the new error code since old
> > clients will not expect it.
> >
> > 2. There should be a way for the consumer to indicate that it has no
> group
> > id and will not commit offsets. This is an explicit instruction that the
> > consumer should not bother with coordinator lookup and such. We currently
> > have some brittle logic in place to let users avoid the coordinator
> lookup,
> > but it is a bit error-prone. I was hoping that we could change the
> default
> > value of group.id to be null so that the user had to take an explicit
> > action to opt into coordinator management (groups or offsets). However,
> it
> > is true that some users may be unknowingly depending on offset storage if
> > they are using both the default group.id and the default
> > enable.auto.commit. Perhaps one option is to disable enable.auto.commit
> > automatically if no group.id is specified? I am not sure if there are
> any
> > drawbacks, but my feeling is that implicit use of the empty group.id to
> > commit offsets is more likely to be causing users unexpected problems
> than
> > actually providing a useful capability.
> >
> > Thoughts?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Mon, May 28, 2018 at 9:50 AM, Vahid S Hashemian <
> > vahidhashem...@us.ibm.com> wrote:
> >
> > > Hi Viktor,
> > >
> > > Thanks for sharing your opinion.
> > > So you're in favor of disallowing the empty ("") group id altogether
> > (even
> > > for fetching).
> > > Given that ideally no one should be using the empty group id (at least
> in
> > > a production setting) I think the impact would be minimal in either
> case.
> > >
> > > But as you said, let's hear what others think and I'd be happy to
> modify
> > > the KIP if needed.
> > >
> > > Regards.
> > > --Vahid
> > >
> > >
> > >
> > >
> > > From:   Viktor Somogyi 
> > > To: dev 
> > > Date:   05/28/2018 05:18 AM
> > > Subject:Re: [DISCUSS] KIP-289: Improve the default group id
> > > behavior in KafkaConsumer
> > >
> > >
> > >
> > > Hi Vahid,
> > >
> > > (with the argument that using the default group id for offset commit
> > > should not be the user's intention in practice).
> > >
> > > Yea, so in my opinion too this use case doesn't seem too practical.
> Also
> > I
> > > think breaking the offset commit is not smaller from this perspective
> > than
> > > breaking fetch and offset fetch. If we suppose that someone uses the
> > > default group id and we break the offset commit then that might be
> harder
> > > to detect than breaking the whole thing altogether. (If we think about
> an
> > > upgrade situation.)
> > > So since we think it is not a practical use case, I think it would be
> > > better to break altogether but ofc that's just my 2 cents :). Let's
> > gather
> > > other's input as well.
> > >
> > > Cheers,
> > > Viktor
> > >
> > > On Fri, May 25, 2018 at 5:43 PM, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com> wrote:
> > >
> > > > Hi Victor,
> > > >
> > > > Thanks for reviewing the KIP.
> > > >
> > > > Yes, to minimize the backward compatibility impact, there would be no
> > > harm
> > > > in letting a stand-alone consumer consume messages under a "" group
> id
> > > (as
> > > > long as there is no offset commit).
> > > > It would have to knowingly seek to an offset or rely on the
> > > > auto.offset.reset config for the starting offset.
> > > > This way the existing functionality would be preserved for the most
> > part
> > > > (with the argument that using the default group id for offset commit
> > > > should not be the user's intention in practice).
> > > >
> > > > Does it seem reasonable?
> > > >
> > > > Thanks.
> > > > --Vahid
> 

[jira] [Created] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-19 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7185:
---

 Summary: getMatchingAcls throws StringIndexOutOfBoundsException 
for empty resource name
 Key: KAFKA-7185
 URL: https://issues.apache.org/jira/browse/KAFKA-7185
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


KIP-290 introduced a way to match ACLs based on prefix. Certain resource names 
like that for group id can be empty strings. When an empty string is passed 
into `getMatchingAcls`, it would throw a `StringIndexOutOfBoundsException` 
because of the following logic:

```

val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
 Resource(resourceType, resourceName.substring(0, Math.min(1, 
resourceName.length)), PatternType.PREFIXED)
)
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap \{ case (resource, versionedAcls) => versionedAcls.acls }
 .toSet

```

This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7184) Kafka is going down with issue ERROR Failed to clean up log for __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException (kafka.server.LogDirFailureChannel)

2018-07-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-7184.
-
Resolution: Not A Problem

> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> --
>
> Key: KAFKA-7184
> URL: https://issues.apache.org/jira/browse/KAFKA-7184
> Project: Kafka
>  Issue Type: Test
>  Components: admin, log
>Affects Versions: 1.1.0
>Reporter: Sandeep Muddamsetty
>Priority: Blocker
> Attachments: log-cleaner.log, server.log.2018-07-18-15
>
>
> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel).
> This  error we are seeing very frequently for every  168 hours(7 days) where 
> the defualt value of log retention in kafka configuration . After modifying 
> it to 240 hours this thing is happening again after 240 hours . I ahve gone 
> thorugh some google groups some artiicles and have observed this is happening 
> in windows system but here i am facing this issue in linux system . Below are 
> my configuration details . 
> kafka_2.11-1.0.0
> OS: OEL 7.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-06-27 Thread Dhruvil Shah
+1 to what Jason said. We need a better long-term strategy for dealing with
corrupted log and index data, but the sanity checks we have do not
guarantee much in this regard.

For now, we could do away with these index sanity checks in my opinion. We
could handle the missing index case at startup. I think we could have
missing index files only when users are upgrading from a version that did
not have a particular type of index to a version that does, or if the
operator physically deleted these files. Because these are rare scenarios,
having to recreate a missing index should typically not affect normal
startup time.

- Dhruvil

On Wed, Jun 27, 2018 at 8:47 AM Jason Gustafson  wrote:

> Hey Dong,
>
>
> So the main concern with the above approach is that, if for any reason the
> > index files of inactive segment is deleted or corrupted, the broker will
> > halt if there is only one log directory. This is different from the
> > existing behavior where the broker will rebuild the index for this
> inactive
> > segment before it can accept any request from consumer. Though we don't
> > have provide guarantee for segments already flushed to disk, this still
> > seems like a change in behavior for user. Maybe we don't have to worry
> > about this if we decide it is very rare, e.g. it happens only when there
> is
> > disk error or when there is human error.
>
>
> I think we should probably still handle the case when an index file is
> missing during startup? But considering how weak the sanity check is, it
> seems fine to skip it.  Also, could we just make this change without a KIP?
> Adding a config to enable a wimpy sanity check seems unnecessary.
>
> One scenario that does come up with users is actual segment corruption,
> which is only detected by consumers that are validating CRCs. To fix it, we
> have to manually delete the segments and force re-replication. It would be
> helpful to have a config to enable deep checking on startup for particular
> topics or partitions. This could also just be a separate tool though
> ("kafka-fsck" or something).
>
> Thinking longer term, I think we need a more systematic approach to dealing
> with corruption, not just in index files, but in the segments as well. It
> might be nice, for example, if the consumer had a way to hint the broker
> that a particular offset is corrupt. The leader might then demote itself,
> for example, and try to repair. Lots to think through though.
>
> -Jason
>
>
>
>
> On Wed, Jun 27, 2018 at 12:29 AM, Dong Lin  wrote:
>
> > So the main concern with the above approach is that, if for any reason
> the
> > index files of inactive segment is deleted or corrupted, the broker will
> > halt if there is only one log directory. This is different from the
> > existing behavior where the broker will rebuild the index for this
> inactive
> > segment before it can accept any request from consumer. Though we don't
> > have provide guarantee for segments already flushed to disk, this still
> > seems like a change in behavior for user. Maybe we don't have to worry
> > about this if we decide it is very rare, e.g. it happens only when there
> is
> > disk error or when there is human error.
> >
> >
> >
> > On Wed, Jun 27, 2018 at 12:04 AM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for the comment!
> > >
> > > Your comment reminded me to read through Jay's comments and my reply
> > > again. It seems that I probably have not captured idea of Jay's comment
> > > that says sanity check is not part of any formal guarantee we provide.
> I
> > > probably should have thought about this comment more. Let me reply to
> > both
> > > yours and Jay's comment and see if I can understand you better.
> > >
> > > Here are some clarifications:
> > > - KIP does not intend to optimize recovery. It aims to optimize the the
> > > sanity check when there is clean shutdown.
> > > - Sanity check only read the last entry of the index rather than the
> full
> > > index
> > > - We have already done data driven investigation though it is not done
> > > using hprof or strace. The resulting rolling bounce time is acceptable
> > now.
> > > If it appears to be an issue e.g. after more data then we may need to
> > > revisit this with more data driven investigation
> > >
> > > I agree with the following comments:
> > > - We should optimize the default behavior instead of adding a new
> config.
> > > - sanity check of the segments before recovery offset is not part of
> any
> > > formal guarantee and thus we probably can just skip it.
> > >
> > > So we are all leaning towards skipping the sanity check of all segments
> > > before the recovery offset. This solution would be pretty
> straightforward
> > > to understand and implement. And I am sure it will give us all the
> > benefits
> > > that this KIP intends to achieve. Here is only one question to double
> > check:
> > >
> > > If consumer fetches from an inactive segment, broker will just use the
> > > index of that inactive segment. 

[jira] [Created] (KAFKA-7076) Broker startup could be inefficient when using old message format

2018-06-19 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7076:
---

 Summary: Broker startup could be inefficient when using old 
message format
 Key: KAFKA-7076
 URL: https://issues.apache.org/jira/browse/KAFKA-7076
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


During broker startup, we call `Log#recoverSegment` when we find corrupted 
indexes, for segments beyond the last check-pointed recovery point, and for any 
".swap" segments created by log cleaner. One of the things 
`Log#recoverSegments` does is to build up the producer state, starting from any 
previous snapshot file that is available. For logs using message formats older 
than V2, we could skip building up this producer state which would essentially 
speed up recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log

2018-06-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-6881.
-
Resolution: Not A Bug

Closing this JIRA because /tmp was being used as the log directory.

> Kafka 1.1 Broker version crashes when deleting log
> --
>
> Key: KAFKA-6881
> URL: https://issues.apache.org/jira/browse/KAFKA-6881
> Project: Kafka
>  Issue Type: Bug
> Environment: Linux
>Reporter: K B Parthasarathy
>Priority: Critical
>
> Hello
> We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka 
> crashed. When we checked server.log file the following log was found
> [2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
> __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
>  java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
>  at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Log.replaceSegments(Log.scala:1648)
>  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>  at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>  at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Suppressed: java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log -> 
> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>  ... 16 more
>  [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
>  [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
> (kafka.log.LogManager)
>  [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
> /tmp/kafka-logs have failed (kafka.log.LogManager)
>  
> Please let me know what may be the issue
>  
> Partha



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7045:
---

 Summary: Consumer may not be able to consume all messages when 
down-conversion is required
 Key: KAFKA-7045
 URL: https://issues.apache.org/jira/browse/KAFKA-7045
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0
Reporter: Dhruvil Shah
 Fix For: 2.1.0


When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
```
public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {
 ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 
return at least
 // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.
 return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
 } else {
 return convertedRecords;
 }
}
``` 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7030) Add configuration to disable message down-conversion

2018-06-09 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7030:
---

 Summary: Add configuration to disable message down-conversion
 Key: KAFKA-7030
 URL: https://issues.apache.org/jira/browse/KAFKA-7030
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


Add configuration to disable message down-conversion as described in 
[KIP-283|https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position

2018-06-05 Thread Dhruvil Shah
I agree that using `default.timeout.ms` could cause confusion since we
already have other timeout configurations in the consumer.

+1 for using `default.block.ms`.

Thanks,
Dhruvil

On Tue, Jun 5, 2018 at 11:48 AM, Bill Bejeck  wrote:

> Hi Jason,
>
> At first, I thought the same name between the producer and the consumer was
> ideal.
>
> But your comment makes me realize consistent names with different semantics
> is even more confusing.
>
> I'm +1 for not using `max.block.ms`.  I like Guozhang's suggestion of `
> default.block.ms` for the name.
>
> Thanks,
> Bill
>
> On Tue, Jun 5, 2018 at 1:33 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > Yeah I agree that "max.block.ms" makes people thinking of the producer's
> > config with the same name, but their semantics are different.
> >
> > On the other hand, I'm a bit concerned with the reusing of the term
> > `timeout` as we already have `session.timeout.ms` and `
> request.timeout.ms`
> > in ConsumerConfig.. How about using the name `default.api.block.ms` or
> > simply `default.block.ms`?
> >
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jun 5, 2018 at 8:57 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey All,
> > >
> > > One more minor follow-up. As I was reviewing the change mentioned
> above,
> > I
> > > felt the name `max.block.ms` was a little bit misleading since it only
> > > applies to methods which do not have an explicit timeout. A clearer
> name
> > > given its usage might be `default.timeout.ms`. It is the default
> timeout
> > > for any blocking API which does not have a timeout. I'm leaning toward
> > > using this name since the current one seems likely to cause confusion.
> > Any
> > > thoughts?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Thu, May 31, 2018 at 6:09 PM, Dong Lin  wrote:
> > >
> > > > Thanks for the KIP! I am in favor of the option 1.
> > > >
> > > > +1 as well.
> > > >
> > > > On Thu, May 31, 2018 at 6:00 PM, Jason Gustafson  >
> > > > wrote:
> > > >
> > > > > Thanks everyone for the feedback. I've updated the KIP and added
> > > > > KAFKA-6979.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Wed, May 30, 2018 at 3:50 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Thanks Jason. I'm in favor of option 1 as well.
> > > > > >
> > > > > > On Wed, May 30, 2018 at 1:37 PM, Bill Bejeck 
> > > > wrote:
> > > > > >
> > > > > > > For what it's worth I'm +1 on Option 1 and the default value
> for
> > > the
> > > > > > > timeout.
> > > > > > >
> > > > > > > In addition to reasons outlined above by Jason, I think it will
> > > help
> > > > to
> > > > > > > reason about consumer behavior (with respect to blocking)
> having
> > > the
> > > > > > > configuration and default value aligned with the producer.
> > > > > > >
> > > > > > > -Bill
> > > > > > >
> > > > > > > On Wed, May 30, 2018 at 3:43 PM, Ismael Juma <
> ism...@juma.me.uk>
> > > > > wrote:
> > > > > > >
> > > > > > > > Sounds good to me,
> > > > > > > >
> > > > > > > > On Wed, May 30, 2018 at 12:40 PM Jason Gustafson <
> > > > ja...@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Perhaps one minute? That is the default used by the
> producer.
> > > > > > > > >
> > > > > > > > > -Jason
> > > > > > > > >
> > > > > > > > > On Wed, May 30, 2018 at 9:50 AM, Ismael Juma <
> > > ism...@juma.me.uk>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Option 1 sounds good to me provided that we can come up
> > with
> > > a
> > > > > good
> > > > > > > > > > default. What would you suggest?
> > > > > > > > > >
> > > > > > > > > > Ismael
> > > > > > > > > >
> > > > > > > > > > On Wed, May 30, 2018 at 9:41 AM Jason Gustafson <
> > > > > > ja...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Everyone,
> > > > > > > > > > >
> > > > > > > > > > > There remains some inconsistency in the timeout
> behavior
> > of
> > > > the
> > > > > > > > > consumer
> > > > > > > > > > > APIs which do not accept a timeout. Some of them block
> > > > forever
> > > > > > > (e.g.
> > > > > > > > > > > position()) and some of them use request.timeout.ms
> > (e.g.
> > > > > > > > > > > parititonsFor()).
> > > > > > > > > > > I think we'd probably all agree that blocking forever
> is
> > > not
> > > > > > useful
> > > > > > > > > > > behavior and using request.timeout.ms has always been
> a
> > > hack
> > > > > > since
> > > > > > > > it
> > > > > > > > > > > controls a separate concern. I think there are
> basically
> > > two
> > > > > > > options
> > > > > > > > to
> > > > > > > > > > > address this:
> > > > > > > > > > >
> > > > > > > > > > > 1. We can add max.block.ms to match the producer and
> use
> > > it
> > > > as
> > > > > > the
> > > > > > > > > > default
> > > > > > > > > > > timeout when a timeout is not explicitly provided. This
> > > will
> > > > > fix
> > > > > > > the
> > > > > > > > > > > indefinite blocking behavior and avoid conflating
> > > > > > > request.timeout.ms
> > > > > > > > .
> > > > > > > > > > 

[jira] [Created] (KAFKA-6950) Add mechanism to delay response to failed client authentication

2018-05-25 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-6950:
---

 Summary: Add mechanism to delay response to failed client 
authentication
 Key: KAFKA-6950
 URL: https://issues.apache.org/jira/browse/KAFKA-6950
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah
 Fix For: 2.0.0


This Jira is for tracking the implementation for 
[KIP-306|https://cwiki.apache.org/confluence/display/KAFKA/KIP-306%3A+Configuration+for+Delaying+Response+to+Failed+Client+Authentication].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-306: Configuration for Delaying Response to Failed Client Authentication

2018-05-22 Thread Dhruvil Shah
Thank you, everyone. I have updated the KIP to address Jason's comment.

I am also closing this vote with 4 binding and 3 non-binding votes, and no
objections.

Thanks,
Dhruvil

On Tue, May 22, 2018 at 1:55 AM, Rajini Sivaram <rajinisiva...@gmail.com>
wrote:

> Hi Dhruvil, Thanks for the KIP!
>
> +1 (binding)
>
> Regards,
>
> Rajini
>
> On Tue, May 22, 2018 at 1:18 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > +1. Just one nit: could we use an INT type for the config? I can't
> imagine
> > that not being enough.
> >
> > -Jason
> >
> > On Mon, May 21, 2018 at 3:59 PM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Thanks for the KIP, +1 (binding).
> > >
> > > Ismael
> > >
> > > On Mon, May 21, 2018 at 7:52 AM Dhruvil Shah <dhru...@confluent.io>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I would like to start a vote on KIP-306 which proposes to add a
> > > > configuration to delay responses to failed authentication.
> > > >
> > > > Link to the KIP:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 306%3A+Configuration+for+Delaying+Response+to+Failed+
> > Client+Authentication
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-6927) Broker uses significant amount of memory during down-conversion

2018-05-21 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-6927:
---

 Summary: Broker uses significant amount of memory during 
down-conversion
 Key: KAFKA-6927
 URL: https://issues.apache.org/jira/browse/KAFKA-6927
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah
 Fix For: 2.0.0


Kafka broker could consume significant amount of memory when down-conversion is 
required. We have seen scenarios where this causes out of memory errors. This 
issue and the proposed fix is described in detail in KIP-283 - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[VOTE] KIP-306: Configuration for Delaying Response to Failed Client Authentication

2018-05-21 Thread Dhruvil Shah
Hi,

I would like to start a vote on KIP-306 which proposes to add a
configuration to delay responses to failed authentication.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-306%3A+Configuration+for+Delaying+Response+to+Failed+Client+Authentication

Thanks,
Dhruvil


[DISCUSS] KIP-306: Configuration for Delaying Response to Failed Client Authentication

2018-05-19 Thread Dhruvil Shah
Hi,

I created a KIP that proposes we add a broker configuration to delay
responses to failed client authentication. This will help prevent DoS-like
situations because of a misconfigured application trying to connect with
incorrect or stale credentials over and over again.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-306%3A+Configuration+for+Delaying+Response+to+Failed+Client+Authentication

Because this is a fairly short and straightforward KIP, I will start a vote
tomorrow if there are no major objections.

Suggestions and feedback are welcome!

Thanks,
Dhruvil


Re: [VOTE] KIP-283: Efficient Memory Usage for Down-Conversion

2018-05-16 Thread Dhruvil Shah
Thanks, Ismael. I added the "Testing Strategy" section to the KIP outlining
the findings.

I am also closing this vote with 3 binding and 1 non-binding +1s and no
objections.

Thanks everyone for your review and feedback.

- Dhruvil

On Tue, May 15, 2018 at 11:04 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks for the KIP Dhruvil, this is a welcome improvement! My understanding
> is that you have done some work to validate that the change has the desired
> effect, it would be good to include that information in the "Testing
> Strategy" section.
>
> +1 (binding)
>
> Ismael
>
> On Wed, May 2, 2018 at 9:27 AM Dhruvil Shah <dhru...@confluent.io> wrote:
>
> > Hi all,
> >
> > I would like to start the vote on KIP-238: Efficient Memory Usage for
> > Down-Conversion.
> >
> > For reference, the link to the KIP is here:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > and the discussion thread is here:
> > https://www.mail-archive.com/dev@kafka.apache.org/msg86799.html
> >
> > Thanks,
> > Dhruvil
> >
>


Re: [VOTE] KIP-283: Efficient Memory Usage for Down-Conversion

2018-05-09 Thread Dhruvil Shah
Thanks for the feedback, Jason and Ismael. I renamed the config to
"message.downconversion.enable".

Also, as an update, I found a potential problem with one of the suggestions
the KIP made, specifically about the case where the size of messages after
down-conversion is greater than the size before, and so we are not able to
send all the messages. Both the old and new consumers expect to receive at
least one full batch of messages for each partition, and throw a
`RecordTooLargeException` if that is not the case. Because of this, I made
a small change to the KIP to make sure we are able to send at least one
full message batch for each partition. Because this is more of an internal
implementation specific change and does not affect user-visible
functionality in any way, I went ahead and updated the KIP with this logic
(under the "Ensuring Consumer Progress" section).

Thanks,
Dhruvil

On Wed, May 9, 2018 at 9:09 AM, Ismael Juma <mli...@juma.me.uk> wrote:

> Maybe it should message instead of record to be consistent with
> message.format.version.
>
> On Wed, 9 May 2018, 09:04 Jason Gustafson, <ja...@confluent.io> wrote:
>
> > Hi Dhruvil,
> >
> > Thanks for the KIP. +1 from me. Just a minor nitpick on the name of the
> new
> > config. I would suggest "record.downconversion.enable". The "record"
> prefix
> > emphasizes what is being down-converted and similar existing configs use
> > "enable" rather than "enabled."
> >
> > -Jason
> >
> > On Wed, May 2, 2018 at 9:35 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > > +1
> > >
> > > On Wed, May 2, 2018 at 9:27 AM, Dhruvil Shah <dhru...@confluent.io>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to start the vote on KIP-238: Efficient Memory Usage for
> > > > Down-Conversion.
> > > >
> > > > For reference, the link to the KIP is here:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > >
> > > > and the discussion thread is here:
> > > > https://www.mail-archive.com/dev@kafka.apache.org/msg86799.html
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > >
> >
>


[VOTE] KIP-283: Efficient Memory Usage for Down-Conversion

2018-05-02 Thread Dhruvil Shah
Hi all,

I would like to start the vote on KIP-238: Efficient Memory Usage for
Down-Conversion.

For reference, the link to the KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion

and the discussion thread is here:
https://www.mail-archive.com/dev@kafka.apache.org/msg86799.html

Thanks,
Dhruvil


Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-30 Thread Dhruvil Shah
Hi all,

I have updated the KIP to reflect changes from the discussion in this
thread. In particular, the KIP now recommends adding a configuration that
allows disabling any kind of message down-conversion for client
`FetchRequest`, instead of having to specify the minimum compatibility.

I also added a note about Jun's idea of down-converting only the first
message batch of the first topic-partition. I think this is a bit tricky to
implement, and given that the maximum fetch size for a topic-partition is
capped at 1MB by default, it should probably be acceptable to take the cost
of down-converting the entire partition upfront. Jun, let me know what you
think.

Any other suggestions / feedback are welcome.

Thanks,
Dhruvil

On Tue, Apr 17, 2018 at 4:21 PM, Dhruvil Shah <dhru...@confluent.io> wrote:

> Hi Jun,
>
> Yes, that is true. Ideally, we should be able to down-convert only the
> first message batch in the request handling thread and delay everything
> else till the network thread. I have not thought through all the details of
> how we could do this but at first glance this seems tricky to implement,
> given that `FetchResponse.PartitionData` for the first partition will be a
> combination of `MemoryRecords` and `LazyDownConvertedRecords`. I will think
> about this a bit more to see if I can come up with a clean abstraction, and
> will also add it to the KIP.
>
> Thanks,
> Dhruvil
>
> On Mon, Apr 16, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dhruvil,
>>
>> Thanks for the KIP. Looks good me to overall. Just one comment below.
>>
>> "To prevent this from happening, we will not delay down-conversion of the
>> first partition in the response. We will down-convert all messages of the
>> first partition in the I/O thread (like we do today), and only delay
>> down-conversion for subsequent partitions." It seems that we can further
>> optimize this by only down-converting the first message set in the first
>> partition in the request handling threads?
>>
>> Jun
>>
>>
>> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io>
>> wrote:
>>
>> > Hi,
>> >
>> > I created a KIP to help mitigate out of memory issues during
>> > down-conversion. The KIP proposes introducing a configuration that can
>> > prevent down-conversions altogether, and also describes a design for
>> > efficient memory usage for down-conversion.
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
>> >
>> > Suggestions and feedback are welcome!
>> >
>> > Thanks,
>> > Dhruvil
>> >
>>
>
>


Re: [VOTE] KIP-281: ConsumerPerformance: Increase Polling Loop Timeout and Make It Reachable by the End User

2018-04-20 Thread Dhruvil Shah
+1 from me as well. I was going to file a JIRA for this exact same problem.
:-)

On Thu, Apr 19, 2018 at 10:45 PM, Alex Dunayevsky 
wrote:

> +1
>
>
> 4 votes total:
>
>   1 binding vote (Jason Gustafson)
>
>   3 non-binding votes (Moshe Blumberg, Ted Yu, Alex Dunayevsky)
>
>
> Can we consider the voting closed?
>
>
> Thank you everyone!
>
> Alex Dunayevsky
>
>
> > Tue, 17 Apr 2018 23:28:35 GMT, Jason Gustafson 
> wrote:
>
> > +1 (binding)
> >
> > On Tue, Apr 17, 2018 at 9:04 AM, > Moshe Blumberg > <
> mos...@hotmail.co.uk>
> wrote:
>
> > +1
> >
> >
> > 
> > From: Ted Yu 
> > Sent: 16 April 2018 22:43
> > To: dev@kafka.apache.org
> > Subject: Re: [VOTE] KIP-281: ConsumerPerformance: Increase Polling Loop
> > Timeout and Make It Reachable by the End User
> > >
> > > +1
> > >
> > > On Mon, Apr 16, 2018 at 2:25 PM, Alex Dunayevsky <
> a.dunayev...@gmail.com>
> > wrote:
> > >
> > > > Hello friends,
> > > >
> > > > Let's start the vote for KIP-281: ConsumerPerformance: Increase
> Polling
> > > > Loop Timeout and Make It Reachable by the End User:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 281%3A+ConsumerPerformance%3A+Increase+Polling+Loop+Timeout+
> > > > and+Make+It+Reachable+by+the+End+User
> > > >
> > > > Thank you,
> > > > Alexander Dunayevsky
> > >
> >
>


Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-17 Thread Dhruvil Shah
Hi Jun,

Yes, that is true. Ideally, we should be able to down-convert only the
first message batch in the request handling thread and delay everything
else till the network thread. I have not thought through all the details of
how we could do this but at first glance this seems tricky to implement,
given that `FetchResponse.PartitionData` for the first partition will be a
combination of `MemoryRecords` and `LazyDownConvertedRecords`. I will think
about this a bit more to see if I can come up with a clean abstraction, and
will also add it to the KIP.

Thanks,
Dhruvil

On Mon, Apr 16, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dhruvil,
>
> Thanks for the KIP. Looks good me to overall. Just one comment below.
>
> "To prevent this from happening, we will not delay down-conversion of the
> first partition in the response. We will down-convert all messages of the
> first partition in the I/O thread (like we do today), and only delay
> down-conversion for subsequent partitions." It seems that we can further
> optimize this by only down-converting the first message set in the first
> partition in the request handling threads?
>
> Jun
>
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>


Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-11 Thread Dhruvil Shah
Hi Jason,

1. The motivation for adding the configuration was that even though we are
improving memory usage, there is still overhead involved with
down-conversion - the broker still has to do extra work which some users
might want to eliminate completely. The proposal also moves the
down-conversion and related file I/O to the network thread which could
block the thread till the read is complete, though we try to minimize the
impact by reading and down-converting only a small batch of messages at a
time.

I agree that the configuration should be added only if we see a high value
for it. One way to determine whether we really need the configuration would
be to run some performance tests to see what kind of an impact the proposal
has, in terms of its effects on memory consumption, ability to handle
concurrent requests, etc.

2. One advantage of being able to specify the version number could be that
users can turn off certain down-conversions that are deemed more expensive
than others. For example, converting from X --> Y could be cheap but X -->
Z could be much more expensive. I am not sure if this has any practical
significance, given how Kafka message formats work today though.

3. I will look into this a bit more and get back to you.

4. Makes sense, will update the KIP with that.

Thanks,
Dhruvil

On Wed, Apr 11, 2018 at 3:54 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Dhruvil,
>
> Thanks for the KIP. Looks good overall. I have a few questions about the
> new configs:
>
> 1. I'm mainly wondering how necessary the configs are given the
> improvements in this KIP to reduce memory pressure from down-conversion.
> The reason I ask is that we'll be stuck with this config for a long time,
> so we should make sure we really need it. Ideally the improvements here are
> enough to make the memory problem a non-issue, but if we're not convinced
> they achieve that, the configs may have some value.
> 2. It seems like the intent is to use these configs to disable
> down-conversion specifically. Would it make sense to let the name be more
> specific (e.g. `log.disable.downconversion`)? Or do you think there are
> other benefits of this config outside of this use case?
> 3. Does this config apply to replica fetchers? I think it would be
> reasonable to disable down-conversion for replica fetchers in all cases
> since it should not be needed anyway and can cause weird log divergence
> edge cases. I opened https://issues.apache.org/jira/browse/KAFKA-6392
> about
> this some time ago. Would it be reasonable to include this as part of this
> KIP?
> 4. You mention in the KIP that we would use the invalid request error code
> if the version is disallowed. Wouldn't it make more sense to use
> unsupported version?
>
> Thanks,
> Jason
>
>
> On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <rajinisiva...@gmail.com>
> wrote:
>
> > Hi Dhruvil,
> >
> > Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> > during down-conversion.
> >
> > Just a couple of minor questions:
> >
> > The goals state: "*Provide appropriate configuration parameters to manage
> > maximum memory usage during down-conversion on the broker.*"
> > Which config parameters are these referring to?
> >
> > What exactly is a chunk going to be - will it be all the records for a
> > partition (which could be quite large?) or one message batch? The KIP
> talks
> > about pre-allocated fixed size buffers, but your last note suggests that
> > you would use temporary buffers created for each partition. Do we need to
> > consider using a memory pool for these or do we think that the buffers
> will
> > be small enough to cope with lots of connections with downconversions?
> This
> > will be a clear improvement over what we have now in any case, but  just
> > checking anyway.
> >
> > Regards,
> >
> > Rajini
> >
> > On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dhru...@confluent.io>
> > wrote:
> >
> > > Hi Ted,
> > >
> > > Thanks for the comments.
> > >
> > >
> > >
> > > *>> bq. we can perform down-conversion when Records.writeTo is
> called.>>
> > > Wouldn't this delay the network thread (though maybe the duration is
> > > short)>> ?*
> > > Yes, this is noted in the Cons section. I think we have a precedent for
> > > this in the `SSLTransportLayer` implementation, so trying to follow a
> > > similar model here.
> > >
> > >
> > > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > > detail ?*
> > > I added the basic structure to the KIP.
>

Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-11 Thread Dhruvil Shah
Hi Rajini,

Thanks for the comments.

Which config parameters are these referring to?

This refers to a proposal that was later rejected. I have removed this goal
from the KIP as it is no longer valid.

What exactly is a chunk going to be

I have updated the KIP to remove references to the fixed buffers. I started
out thinking we would have fix-sized buffers with a memory pool but it
seems we could achieve what we want without the added complexity.

My current proposal is that we read a set of message batches into memory
till we reach a pre-configured size threshold, say 16kB (the pre-configured
size might need to be larger if the first message batch is larger than
that). We down-convert these batches and hold them in a temporary buffer
till are able to send out all the messages. And then repeat the same
process for subsequent batches.

Because we down-convert a maximum of 16kB worth of messages at any given
point in time, the memory utilization should be much more deterministic,
i.e. we have a maximum of about 16kB memory allocated for each in-flight
`FetchResponse` that requires down-conversion.

Let me know if this makes sense.

Thanks,
Dhruvil

On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <rajinisiva...@gmail.com>
wrote:

> Hi Dhruvil,
>
> Thanks for the KIP. This is a great improvement to reduce OOMs in brokers
> during down-conversion.
>
> Just a couple of minor questions:
>
> The goals state: "*Provide appropriate configuration parameters to manage
> maximum memory usage during down-conversion on the broker.*"
> Which config parameters are these referring to?
>
> What exactly is a chunk going to be - will it be all the records for a
> partition (which could be quite large?) or one message batch? The KIP talks
> about pre-allocated fixed size buffers, but your last note suggests that
> you would use temporary buffers created for each partition. Do we need to
> consider using a memory pool for these or do we think that the buffers will
> be small enough to cope with lots of connections with downconversions? This
> will be a clear improvement over what we have now in any case, but  just
> checking anyway.
>
> Regards,
>
> Rajini
>
> On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dhru...@confluent.io>
> wrote:
>
> > Hi Ted,
> >
> > Thanks for the comments.
> >
> >
> >
> > *>> bq. we can perform down-conversion when Records.writeTo is called.>>
> > Wouldn't this delay the network thread (though maybe the duration is
> > short)>> ?*
> > Yes, this is noted in the Cons section. I think we have a precedent for
> > this in the `SSLTransportLayer` implementation, so trying to follow a
> > similar model here.
> >
> >
> > *>> Can you expand on the structure of LazyDownConvertedRecords in more
> > detail ?*
> > I added the basic structure to the KIP.
> >
> >
> >
> >
> > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
> > didn't see the above config. Did you mean>> message.max.bytes>> ?*
> > Yes, thanks for the correction.
> >
> >
> > *>> After the buffers grow, is there a way to trim them down if
> > subsequent>> down-conversion doesn't need that much memory ?*
> > The easiest way probably is to allocate and use a new buffer for each
> > topic-partition. I think we would not require any trimming down if we do
> > this. The buffer will be available for garbage collection as soon as we
> are
> > done serializing and writing all messages to the socket for the
> particular
> > topic-partition.
> >
> > Thanks,
> > Dhruvil
> >
> >
> > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > > bq. we can perform down-conversion when Records.writeTo is called.
> > >
> > > Wouldn't this delay the network thread (though maybe the duration is
> > short)
> > > ?
> > >
> > > Can you expand on the structure of LazyDownConvertedRecords in more
> > detail
> > > ?
> > >
> > > bq. even if it exceeds fetch.max.bytes
> > >
> > > I did a brief search but didn't see the above config. Did you mean
> > > message.max.bytes
> > > ?
> > >
> > > bq. with possibility to grow if the allocation
> > >
> > > After the buffers grow, is there a way to trim them down if subsequent
> > > down-conversion doesn't need that much memory ?
> > >
> > > Thanks
> > >
> > >
> > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a KIP to help mitigate out of memory issues during
> > > > down-conversion. The KIP proposes introducing a configuration that
> can
> > > > prevent down-conversions altogether, and also describes a design for
> > > > efficient memory usage for down-conversion.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> > > >
> > > > Suggestions and feedback are welcome!
> > > >
> > > > Thanks,
> > > > Dhruvil
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-06 Thread Dhruvil Shah
Hi Ted,

Thanks for the comments.



*>> bq. we can perform down-conversion when Records.writeTo is called.>>
Wouldn't this delay the network thread (though maybe the duration is
short)>> ?*
Yes, this is noted in the Cons section. I think we have a precedent for
this in the `SSLTransportLayer` implementation, so trying to follow a
similar model here.


*>> Can you expand on the structure of LazyDownConvertedRecords in more
detail ?*
I added the basic structure to the KIP.




*>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but
didn't see the above config. Did you mean>> message.max.bytes>> ?*
Yes, thanks for the correction.


*>> After the buffers grow, is there a way to trim them down if
subsequent>> down-conversion doesn't need that much memory ?*
The easiest way probably is to allocate and use a new buffer for each
topic-partition. I think we would not require any trimming down if we do
this. The buffer will be available for garbage collection as soon as we are
done serializing and writing all messages to the socket for the particular
topic-partition.

Thanks,
Dhruvil


On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. we can perform down-conversion when Records.writeTo is called.
>
> Wouldn't this delay the network thread (though maybe the duration is short)
> ?
>
> Can you expand on the structure of LazyDownConvertedRecords in more detail
> ?
>
> bq. even if it exceeds fetch.max.bytes
>
> I did a brief search but didn't see the above config. Did you mean
> message.max.bytes
> ?
>
> bq. with possibility to grow if the allocation
>
> After the buffers grow, is there a way to trim them down if subsequent
> down-conversion doesn't need that much memory ?
>
> Thanks
>
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>


Re: [DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-06 Thread Dhruvil Shah
I fixed the diagrams - let me know if you are still having trouble seeing
them.

Thanks,
Dhruvil

On Fri, Apr 6, 2018 at 3:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> The two embedded diagrams seem broken.
>
> Can you double check ?
>
> Thanks
>
> On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io> wrote:
>
> > Hi,
> >
> > I created a KIP to help mitigate out of memory issues during
> > down-conversion. The KIP proposes introducing a configuration that can
> > prevent down-conversions altogether, and also describes a design for
> > efficient memory usage for down-conversion.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 283%3A+Efficient+Memory+Usage+for+Down-Conversion
> >
> > Suggestions and feedback are welcome!
> >
> > Thanks,
> > Dhruvil
> >
>


[DISCUSS] KIP-283: Efficient Memory Usage for Down-Conversion

2018-04-06 Thread Dhruvil Shah
Hi,

I created a KIP to help mitigate out of memory issues during
down-conversion. The KIP proposes introducing a configuration that can
prevent down-conversions altogether, and also describes a design for
efficient memory usage for down-conversion.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion

Suggestions and feedback are welcome!

Thanks,
Dhruvil