[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-01 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r462738230



##
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##
@@ -85,11 +89,12 @@ public ApiKeys apiKey() {
 public RequestHeader makeHeader(short version) {
 short requestApiKey = requestBuilder.apiKey().id;
 return new RequestHeader(
-new RequestHeaderData().
-setRequestApiKey(requestApiKey).
-setRequestApiVersion(version).
-setClientId(clientId).
-setCorrelationId(correlationId),
+new RequestHeaderData()

Review comment:
   You commented on the previous PR about the style here. The reasoning is 
that this is a more common style than having period at the end in our codebase.

##
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##
@@ -151,6 +147,20 @@ public void testDnsLookupFailure() {
 assertFalse(client.ready(new Node(1234, "badhost", 1234), 
time.milliseconds()));
 }
 
+@Test
+public void testIncludeInitialPrincipalNameAndClientIdInHeader() {

Review comment:
   This is the new test, the rest of changes in this file are just side 
cleanups.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##
@@ -64,23 +64,11 @@ public String value() {
 
 public static class Builder extends 
AbstractRequest.Builder {
 
-private final AlterConfigsRequestData data = new 
AlterConfigsRequestData();
+private final AlterConfigsRequestData data;
 
-public Builder(Map configs, boolean 
validateOnly) {

Review comment:
   Moved to `AlterConfigsUtil`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,27 +2203,6 @@ void handleFailure(Throwable throwable) {
 return futures;
 }
 
-private IncrementalAlterConfigsRequestData 
toIncrementalAlterConfigsRequestData(final Collection resources,

Review comment:
   Moved to `AlterConfigsUtil`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2020-08-01 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169439#comment-17169439
 ] 

Tom Lee commented on KAFKA-10337:
-

Opened [https://github.com/apache/kafka/pull/9111] to address this particular 
edge case.

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] thomaslee opened a new pull request #9111: KAFKA-10337: await async commits in commitSync even if no offsets given

2020-08-01 Thread GitBox


thomaslee opened a new pull request #9111:
URL: https://github.com/apache/kafka/pull/9111


   The contract for `commitSync()` guarantees that the callbacks for all
   prior async commits will be invoked before it (successfully?) returns.
   Prior to this change the contract could be violated if an empty offsets
   map were passed in to `commitSync()`.
   
   Also added a unit test in `ConsumerCoordinatorTest` exercising this 
particular path.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2020-08-01 Thread Tom Lee (Jira)
Tom Lee created KAFKA-10337:
---

 Summary: Wait for pending async commits in commitSync() even if no 
offsets are specified
 Key: KAFKA-10337
 URL: https://issues.apache.org/jira/browse/KAFKA-10337
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Tom Lee


The JavaDoc for commitSync() states the following:
{quote}Note that asynchronous offset commits sent previously with the
{@link #commitAsync(OffsetCommitCallback)}
 (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
{quote}
But should we happen to call the method with an empty offset map
(i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
async commits will not be invoked because of an early return in 
ConsumerCoordinator.commitOffsetsSync() when the input map is empty.

If users are doing manual offset commits and relying on commitSync as a barrier 
for in-flight async commits prior to a rebalance, this could be an important 
(though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #9070: MINOR: speed up release script

2020-08-01 Thread GitBox


rhauch commented on pull request #9070:
URL: https://github.com/apache/kafka/pull/9070#issuecomment-667606488


   Agreed! This significantly sped up putting together 2.6.0RC2.
   
   On Sat, Aug 1, 2020 at 5:35 PM Ismael Juma  wrote:
   
   > Nice improvement!
   >
   > —
   > You are receiving this because your review was requested.
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression may throw exceptions

2020-08-01 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169432#comment-17169432
 ] 

John Roesler commented on KAFKA-10336:
--

Updated the title and description to correct the role of standbys. 

> Rolling upgrade with Suppression may throw exceptions
> -
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: bug, user-experience
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you use Suppress with changelogging enabled, you may experience exceptions 
> leading to threads shutting down on the OLD instances during a rolling 
> upgrade. No corruption is expected, and when the rolling upgrade completes, 
> all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> A similar condition can arise without standby replicas. During the rolling 
> bounce it is possible that tasks may shuffled between nodes. As the rolling 
> bounce progresses, a task may be moved from a new-versioned instance to an 
> old-versioned one. If the new-versioned instance had processed some data, the 
> old-versioned one would have to restore from the changelog and would 
> encounter the same issue I described for standbys. 
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) changelogged suppression buffers. Changing any of those 
> variables will prevent the issue from occurring. I would NOT recommend 
> disabling changelogging (C), and (B) is probably off the table, since the 
> application logic presumably depends on it. Therefore, your practical choice 
> is to do a full-cluster bounce (A). Disabling standby replicas will decrease 
> the probability of exceptions, but it’s no guarantee. Personally, I think (A) 
> is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades in the system test. 
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling 

[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression may throw exceptions

2020-08-01 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Description: 
Tl;dr:

If you use Suppress with changelogging enabled, you may experience exceptions 
leading to threads shutting down on the OLD instances during a rolling upgrade. 
No corruption is expected, and when the rolling upgrade completes, all threads 
will be running and processing correctly.

Details:

The Suppression changelog has had to change its internal data format several 
times to fix bugs. The binary schema of the changelog values is determined by a 
version header on the records, and new versions are able to decode all old 
versions' formats.

The suppression changelog decoder is also configured to throw an exception if 
it encounters a version number that it doesn't recognize, causing the thread to 
stop processing and shut down.

When standbys are configured, there is one so-called "active" worker writing 
into the suppression buffer and sending the same messages into the changelog, 
while another "standby" worker reads those messages, decodes them, and 
maintains a hot-standby replica of the suppression buffer.

If the standby worker is running and older version of Streams than the active 
worker, what can happen today is that the active worker may write changelog 
messages with a higher version number than the standby worker can understand. 
When the standby worker receives one of these messages, it will throw the 
exception and shut down its thread.

A similar condition can arise without standby replicas. During the rolling 
bounce it is possible that tasks may shuffled between nodes. As the rolling 
bounce progresses, a task may be moved from a new-versioned instance to an 
old-versioned one. If the new-versioned instance had processed some data, the 
old-versioned one would have to restore from the changelog and would encounter 
the same issue I described for standbys. 

Note, although the exceptions are undesired, at least this behavior protects 
the integrity of the application and prevents data corruption or loss.

Workaround:

Several workarounds are possible:

This only affects clusters that do all of (A) rolling bounce, (B) suppression, 
(C) changelogged suppression buffers. Changing any of those variables will 
prevent the issue from occurring. I would NOT recommend disabling changelogging 
(C), and (B) is probably off the table, since the application logic presumably 
depends on it. Therefore, your practical choice is to do a full-cluster bounce 
(A). Disabling standby replicas will decrease the probability of exceptions, 
but it’s no guarantee. Personally, I think (A) is the best option.

Also note, although the exceptions and threads shutting down are not ideal, 
they would only afflict the old-versioned nodes. I.e., the nodes you intend to 
replace anyway. So another "workaround" is simply to ignore the exceptions and 
proceed with the rolling bounce. As the old-versioned nodes are replaced with 
new-versioned nodes, the new nodes will again be able to decode their peers' 
changelog messages and be able to maintain the hot-standby replicas of the 
suppression buffers.

Detection:

Although I really should have anticipated this condition, I first detected it 
while expanding our system test coverage as part of KAFKA-10173. I added a 
rolling upgrade test with an application that uses both suppression and standby 
replicas, and observed that the rolling upgrades would occasionally cause the 
old nodes to crash. Accordingly, in KAFKA-10173, I disabled the rolling-upgrade 
configuration and only do full-cluster upgrades. Resolving _this_ ticket will 
allow us to re-enable rolling upgrades in the system test. 

Proposed solution:

Part 1:

Since Streams can decode both current and past versions, but not future 
versions, we need to implement a mechanism to prevent new-versioned nodes from 
writing new-versioned messages, which would appear as future-versioned messages 
to the old-versioned nodes.

We have an UPGRADE_FROM configuration that we could leverage to accomplish 
this. In that case, when upgrading from 2.3 to 2.4, you would set UPGRADE_FROM 
to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) nodes would 
continue writing messages in the old (2.3) format. Thus, the still-running old 
nodes will still be able to read them.

Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
because all the members are running 2.4 at this point and can decode these 
messages, even if they are still configured to write with version 2.3.

After the second rolling bounce, the whole cluster is both running 2.4 and 
writing with the 2.4 format.

Part 2:

Managing two rolling bounces can be difficult, so it is also desirable to 
implement a mechanism for automatically negotiating 

[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression may throw exceptions

2020-08-01 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Description: 
Tl;dr:

If you use Suppress with changelogging enabled, you may experience exceptions 
leading to threads shutting down on the OLD instances during a rolling upgrade. 
No corruption is expected, and when the rolling upgrade completes, all threads 
will be running and processing correctly.

Details:

The Suppression changelog has had to change its internal data format several 
times to fix bugs. The binary schema of the changelog values is determined by a 
version header on the records, and new versions are able to decode all old 
versions' formats.

The suppression changelog decoder is also configured to throw an exception if 
it encounters a version number that it doesn't recognize, causing the thread to 
stop processing and shut down.

When standbys are configured, there is one so-called "active" worker writing 
into the suppression buffer and sending the same messages into the changelog, 
while another "standby" worker reads those messages, decodes them, and 
maintains a hot-standby replica of the suppression buffer.

If the standby worker is running and older version of Streams than the active 
worker, what can happen today is that the active worker may write changelog 
messages with a higher version number than the standby worker can understand. 
When the standby worker receives one of these messages, it will throw the 
exception and shut down its thread.

A similar condition can arise without standby replicas. During the rolling 
bounce it is possible that tasks may shuffled between nodes. As the rolling 
bounce progresses, a task may be moved from a new-versioned instance to an 
old-versioned one. If the new-versioned instance had processed some data, the 
old-versioned one would have to restore from the changelog and would encounter 
the same issue I described for standbys. 

Note, although the exceptions are undesired, at least this behavior protects 
the integrity of the application and prevents data corruption or loss.

Workaround:

Several workarounds are possible:

This only affects clusters that do all of (A) rolling bounce, (B) suppression, 
(C) changelogged suppression buffers. Changing any of those variables will 
prevent the issue from occurring. I would NOT recommend disabling changelogging 
(C), and (B) is probably off the table, since the application logic presumably 
depends on it. Therefore, your practical choice is to do a full-cluster bounce 
(A). Disabling standby replicas will decrease the probability of exceptions, 
but it’s no guarantee. Personally, I think (A) is the best option.

Also note, although the exceptions and threads shutting down are not ideal, 
they would only afflict the old-versioned nodes. I.e., the nodes you intend to 
replace anyway. So another "workaround" is simply to ignore the exceptions and 
proceed with the rolling bounce. As the old-versioned nodes are replaced with 
new-versioned nodes, the new nodes will again be able to decode their peers' 
changelog messages and be able to maintain the hot-standby replicas of the 
suppression buffers.

Detection:

Although I really should have anticipated this condition, I first detected it 
while expanding our system test coverage as part of KAFKA-10173. I added a 
rolling upgrade test with an application that uses both suppression and standby 
replicas, and observed that the rolling upgrades would occasionally cause the 
old nodes to crash. Accordingly, in KAFKA-10173, I disabled the rolling-upgrade 
configuration and only do full-cluster upgrades. Resolving _this_ ticket will 
allow us to re-enable rolling upgrades.

Proposed solution:

Part 1:

Since Streams can decode both current and past versions, but not future 
versions, we need to implement a mechanism to prevent new-versioned nodes from 
writing new-versioned messages, which would appear as future-versioned messages 
to the old-versioned nodes.

We have an UPGRADE_FROM configuration that we could leverage to accomplish 
this. In that case, when upgrading from 2.3 to 2.4, you would set UPGRADE_FROM 
to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) nodes would 
continue writing messages in the old (2.3) format. Thus, the still-running old 
nodes will still be able to read them.

Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
because all the members are running 2.4 at this point and can decode these 
messages, even if they are still configured to write with version 2.3.

After the second rolling bounce, the whole cluster is both running 2.4 and 
writing with the 2.4 format.

Part 2:

Managing two rolling bounces can be difficult, so it is also desirable to 
implement a mechanism for automatically negotiating the schema version 

[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression may throw exceptions

2020-08-01 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Summary: Rolling upgrade with Suppression may throw exceptions  (was: 
Rolling upgrade with Suppression AND Standbys may throw exceptions)

> Rolling upgrade with Suppression may throw exceptions
> -
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: bug, user-experience
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with 

[GitHub] [kafka] ijuma commented on a change in pull request #5374: MINOR: update release.py

2020-08-01 Thread GitBox


ijuma commented on a change in pull request #5374:
URL: https://github.com/apache/kafka/pull/5374#discussion_r464007379



##
File path: release.py
##
@@ -479,16 +479,16 @@ def select_gpg_key():
 for root, dirs, files in os.walk(artifacts_dir):
 assert root.startswith(artifacts_dir)
 
-for file in files:
-local_path = os.path.join(root, file)
-remote_path = os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], file)
-sftp_cmds += "\nput %s %s" % (local_path, remote_path)
-
 for dir in dirs:
 sftp_mkdir(os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], dir))
 
-if sftp_cmds:
-cmd("Uploading artifacts in %s to your Apache home directory" % root, 
"sftp -b - %s...@home.apache.org" % apache_id, stdin=sftp_cmds)
+for file in files:
+local_path = os.path.join(root, file)
+remote_path = os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], file)
+sftp_cmds = """
+put %s %s
+""" % (local_path, remote_path)
+cmd("Uploading artifacts in %s to your Apache home directory" % root, 
"sftp -b - %s...@home.apache.org" % apache_id, stdin=sftp_cmds)

Review comment:
   https://github.com/apache/kafka/pull/9070 apparently reduces the time 
from 2 hours to 5 minutes. sftp per file seems pretty slow even for something 
that is not used frequently. Not sure if it became slower over time or 
something specific to John's connection though. :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9070: MINOR: speed up release script

2020-08-01 Thread GitBox


ijuma commented on pull request #9070:
URL: https://github.com/apache/kafka/pull/9070#issuecomment-667595437


   Nice improvement!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-08-01 Thread GitBox


ijuma commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r463980962



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   Can we think about cases where this could be an issue? Say delete 
records is used, causing a large number of segments to be deleted Could that 
trigger excessive logging?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-08-01 Thread GitBox


dhruvilshah3 commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r463975069



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   While verbose, I think having the granularity of each segment is useful. 
This allows us to easily reason about why a particular segment was deleted. 
Note that we switched from a single log per batch to a log per segment in 
https://github.com/apache/kafka/pull/8850.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10303) kafka producer says connect failed in cluster mode

2020-08-01 Thread Yogesh BG (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169288#comment-17169288
 ] 

Yogesh BG commented on KAFKA-10303:
---

Hi this issue is resolved now, we had an issue with node ip assignment, due to 
which we used to get this issue. Thanks. we can close this issue

> kafka producer says connect failed in cluster mode
> --
>
> Key: KAFKA-10303
> URL: https://issues.apache.org/jira/browse/KAFKA-10303
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
>  
> I am using kafka broker version 2.3.0
> We have two setups with standalone(one node) and 3 nodes cluster
> we pump huge data ~25MBPS, ~80K messages per second
> It all works well in one node mode
> but in case of cluster, producer start throwing connect failed(librd kafka)
> after sometime again able to connect start sending traffic.
> What could be the issue? some of the configurations are
>  
> replica.fetch.max.bytes=10485760
> num.network.threads=12
> num.replica.fetchers=6
> queued.max.requests=5
> # The number of threads doing disk I/O
> num.io.threads=12
> replica.socket.receive.buffer.bytes=1
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936098



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature
+ *version level deprecation. This is how it works: in order to deprecate 
feature version levels,
+ *in this map the default minimum version level of a feature can be set to 
a new value that's
+ *higher than 1 (let's call this latest_min_version_level). In doing so, 
the feature version levels
+ *in the closed range: [1, latest_min_version_level - 1] get deprecated by 
the controller logic
+ *that applies this map to persistent finalized feature state in ZK (this 
mutation happens
+ *during controller election and during finalized feature updates via the
+ *ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean 
external clients of Kafka
+ *would need to stop using the finalized min version levels that have been 
deprecated.
+ *
+ * This class also provides APIs to check for incompatibilities between the 
features supported by
+ * the Broker and finalized features. This class is immutable in production. 
It provides few APIs to
+ * mutate state only for the purpose of testing.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: 
Features[SupportedVersionRange],
+  @volatile var defaultFeatureMinVersionLevels: 
Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, 
defaultFeatureMinVersionLevels))
+supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return  the default minimum version level for the feature if its 
defined.
+   *  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): 
Unit = {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, 
newMinVersionLevels))
+defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP 
config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(
+  supportedFeatures.features.asScala.map {
+case(name, versionRange) => (
+  name, new FinalizedVersionRange(defaultMinVersionLevel(name), 
versionRange.max))
+  }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a 
provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *   [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   * supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @returnThe subset of input features which are incompatible. 
If 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {

Review comment:
   A value < 1 is indicative of a deletion request (a kind of downgrade 
request).
   It is for convenience of generating a special error message, that we handle 
the case here explicitly: `...less than 1 for feature...`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {

Review comment:
   A value < 1 is indicative of a deletion request (not purely a downgrade 
request).
   It is for convenience of generating a special error message, that we handle 
the case here explicitly: `...less than 1 for feature...`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463937184



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.{assertThrows, intercept}
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateDefaultMinVersionLevelsInAllBrokers(newMinVersionLevels: 
Map[String, Short]): Unit = {
+servers.foreach(s => {
+  s.brokerFeatures.setDefaultMinVersionLevels(newMinVersionLevels)
+})
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def checkFeatures(client: Admin, expectedNode: 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463937032



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -109,7 +109,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 brokerTopicStats: BrokerTopicStats,
 val clusterId: String,
 time: Time,
-val tokenManager: DelegationTokenManager) extends Logging {
+val tokenManager: DelegationTokenManager,
+val brokerFeatures: BrokerFeatures,

Review comment:
   The `FinalizedFeatureCache.getSupportedFeatures` API is not the right 
fit for the cache's public interface (it is quite unrelated to the other public 
APIs of the cache). I'd rather not pollute the public API there, just for the 
sake of convenience.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936412



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging {
 " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
   throw new FeatureCacheUpdateException(errorMsg)
 } else {
-  val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+  val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
   if (!incompatibleFeatures.empty) {
 val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
   " checks failed! Supported %s has incompatibilities with the latest 
%s."
-  ).format(SupportedFeatures.get, latest)
+  ).format(brokerFeatures.supportedFeatures, latest)
 throw new FeatureCacheUpdateException(errorMsg)
   } else {
-val logMsg = "Updated cache from existing finalized %s to latest 
finalized %s".format(
+val logMsg = "Updated cache from existing %s to latest %s".format(
   oldFeatureAndEpoch, latest)
-featuresAndEpoch = Some(latest)
+synchronized {
+  featuresAndEpoch = Some(latest)
+  notifyAll()
+}
 info(logMsg)
   }
 }
   }
+
+  /**
+   * Causes the current thread to wait no more than timeoutMs for the 
specified condition to be met.
+   * It is guaranteed that the provided condition will always be invoked only 
from within a
+   * synchronized block.
+   *
+   * @param waitCondition   the condition to be waited upon:
+   * - if the condition returns true, then, the wait 
will stop.
+   * - if the condition returns false, it means the 
wait must continue until
+   *   timeout.
+   *
+   * @param timeoutMs   the timeout (in milli seconds)
+   *
+   * @throwsTimeoutException if the condition is not met 
within timeoutMs.
+   */
+  private def waitUntilConditionOrThrow(waitCondition: () => Boolean, 
timeoutMs: Long): Unit = {
+if(timeoutMs < 0L) {
+  throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but 
$timeoutMs was provided.")
+}
+val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000)

Review comment:
   Since the app depends on monotonically increasing elapsed time values, 
`System.nanoTime()` is preferred. 
   `System.currentTimeMillis()` can change due to daylight saving time, users 
changing the time settings, leap seconds, and internet time sync etc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936098



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature
+ *version level deprecation. This is how it works: in order to deprecate 
feature version levels,
+ *in this map the default minimum version level of a feature can be set to 
a new value that's
+ *higher than 1 (let's call this latest_min_version_level). In doing so, 
the feature version levels
+ *in the closed range: [1, latest_min_version_level - 1] get deprecated by 
the controller logic
+ *that applies this map to persistent finalized feature state in ZK (this 
mutation happens
+ *during controller election and during finalized feature updates via the
+ *ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean 
external clients of Kafka
+ *would need to stop using the finalized min version levels that have been 
deprecated.
+ *
+ * This class also provides APIs to check for incompatibilities between the 
features supported by
+ * the Broker and finalized features. This class is immutable in production. 
It provides few APIs to
+ * mutate state only for the purpose of testing.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: 
Features[SupportedVersionRange],
+  @volatile var defaultFeatureMinVersionLevels: 
Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, 
defaultFeatureMinVersionLevels))
+supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return  the default minimum version level for the feature if its 
defined.
+   *  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): 
Unit = {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, 
newMinVersionLevels))
+defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP 
config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(
+  supportedFeatures.features.asScala.map {
+case(name, versionRange) => (
+  name, new FinalizedVersionRange(defaultMinVersionLevel(name), 
versionRange.max))
+  }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a 
provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *   [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   * supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @returnThe subset of input features which are incompatible. 
If 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936048



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+// Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+Right(new ApiError(Errors.INVALID_REQUEST,
+   s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+   s" than 1 for feature: '${update.feature}' without 
setting the" +
+   " allowDowngrade flag to true in the request."))
+  } else {
+existingVersionRange.map(existing =>
+  if (update.maxVersionLevel == existing.max) {
+// Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+Right(new 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {

Review comment:
   It is for returning the special error message that we handle it here 
explicitly: `...less than 1 for feature...`.
   A value < 1 is indicative of a deletion request (not purely a downgrade 
request).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463934948



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1395,7 +1596,7 @@ class KafkaController(val config: KafkaConfig,
 if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
   val oldMetadata = oldMetadataOpt.get
   val newMetadata = newMetadataOpt.get
-  if (newMetadata.endPoints != oldMetadata.endPoints) {
+  if (newMetadata.endPoints != oldMetadata.endPoints || 
!oldMetadata.features.equals(newMetadata.features)) {

Review comment:
   I do not understand the concern.
   Which code path can possibly introduce `null` features attribute in `Broker` 
object? It is impossible





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463934710



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of it’s own 
supported features in its
+   * own BrokerIdZnode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in ZK in the cluster-wide 
common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the one and 
only entity modifying
+   * the information about finalized features and their version levels.
+   *
+   * This method sets up the FeatureZNode with enabled status. This status 
means the feature
+   * versioning system (KIP-584) is enabled, and, the finalized features 
stored in the FeatureZNode
+   * are active. This status should be written by the controller to the 
FeatureZNode only when the
+   * broker IBP config is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *Broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the Broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. The reason to do this is that enabling all the possible 
features immediately after
+   *an upgrade could be harmful to the cluster.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, then it
+   *will react by creating a FeatureZNode with disabled status and 
empty finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features 

[GitHub] [kafka] kowshik commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r463914087



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   If we passed in the deletion reason further into the 
`deleteSegmentFiles` method, it seems we can print the reason string just once 
for a batch of segments being deleted. And within the reason string, we can 
provide the reason for deleting the batch:
   
   
https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2519
   
https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2526
   
   ex: `info("Deleting segments due to $reason: ${segments.mkString(",")}"`
   
   where `$reason` provides `due to retention time 120ms breach`.
   
   The drawback is that sometimes we can not print segment-specific information 
since the error message is at a batch level. But generally it may be that 
segment-level deletion information could bloat our server logging, so it may be 
better to batch the logging instead.
   
   What are your thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-08-01 Thread John Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169052#comment-17169052
 ] 

John Thomas edited comment on KAFKA-10186 at 8/1/20, 7:08 AM:
--

[~ableegoldman] My understanding after reading the description is that, If we 
abort a transaction with any non-flushed data, we want to throw a different 
exception, since we know its non-fatal.

Going through the code I see that , {color:#172b4d}In 
Sender#maybeSendAndPollTransactionalRequest : 
transactionManaer.hasAbortableError() -> This is fatal, {color}

{color:#172b4d}transactionManager.isAborting() - > This is something we know 
that its aborted, and is recoverable.{color}

{color:#172b4d}You are suggesting we should put in a new exception message to 
differentiate  ?Or a whole new Exception ? 
{color}

 
{code:java}
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
 RuntimeException exception = transactionManager.lastError();
 if (exception == null) {
exception = new KafkaException("Failing batch since transaction was 
aborted");
 }
 accumulator.abortUndrainedBatches(exception);
  }
}{code}
PS : #newbie !

 


was (Author: johnthotekat):
[~ableegoldman] For my understanding, If we abort a transaction with any 
non-flushed data, we want to throw a different exception, since we know its 
non-fatal ?

{color:#172b4d}In Sender#maybeSendAndPollTransactionalRequest : 
transactionManaer.hasAbortableError() -> This is fatal, {color}

{color:#172b4d}transactionManager.isAborting() - > This is something we know 
that its aborted, and is recoverable.
{color}

{color:#172b4d}You are suggesting we should put in a new exception message ? or 
a whole new exception class ? {color}

 
{code:java}
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
 RuntimeException exception = transactionManager.lastError();
 if (exception == null) {
exception = new KafkaException("Failing batch since transaction was 
aborted");
 }
 accumulator.abortUndrainedBatches(exception);
  }
}{code}
PS : #newbie !

 

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-08-01 Thread John Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169052#comment-17169052
 ] 

John Thomas edited comment on KAFKA-10186 at 8/1/20, 7:02 AM:
--

[~ableegoldman] For my understanding, If we abort a transaction with any 
non-flushed data, we want to throw a different exception, since we know its 
non-fatal ?

{color:#172b4d}In Sender#maybeSendAndPollTransactionalRequest : 
transactionManaer.hasAbortableError() -> This is fatal, {color}

{color:#172b4d}transactionManager.isAborting() - > This is something we know 
that its aborted, and is recoverable.
{color}

{color:#172b4d}You are suggesting we should put in a new exception message ? or 
a whole new exception class ? {color}

 
{code:java}
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
 RuntimeException exception = transactionManager.lastError();
 if (exception == null) {
exception = new KafkaException("Failing batch since transaction was 
aborted");
 }
 accumulator.abortUndrainedBatches(exception);
  }
}{code}
PS : #newbie !

 


was (Author: johnthotekat):
[~ableegoldman] If we abort a transaction with any non-flushed data, we want to 
throw a different exception, since we know its non-fatal ? 

{color:#172b4d}If my understanding is correct, In 
Sender#maybeSendAndPollTransactionalRequest : 
transactionManaer.hasAbortableError() -> This is fatal, {color}

{color:#172b4d}transactionManager.isAborting() - > This is something we know 
that its aborted, and is recoverable. --
{color}

 
{code:java}
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
 RuntimeException exception = transactionManager.lastError();
 if (exception == null) {
exception = new KafkaException("Failing batch since transaction was 
aborted");
 }
 accumulator.abortUndrainedBatches(exception);
  }
}{code}
PS : #newbie !

 

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-08-01 Thread GitBox


dongjinleekr commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-667483663


   @tombentley Congratulations! :congratulations: @omkreddy @mimaison Thanks 
again for the detailed review, as usual! :smiley: 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-08-01 Thread GitBox


chia7712 commented on a change in pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#discussion_r463930430



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -228,6 +228,13 @@ public Password getPassword(String key) {
 return copy;
 }
 
+public Map originals(Map configOverrides) {

Review comment:
   Will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-08-01 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-667482560


   > Btw, I don't think this is an improvement rather than a bug, as we don't 
have any guarantee to see client id in serdes before.
   
   There is another reason. We do pass generated client id to metric reporter.  
It seems to me all plugins should see consistent props.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org