[GitHub] [kafka] glasser edited a comment on pull request #11020: KAFKA-12937; mm2 can start from the ending of a topic

2021-07-13 Thread GitBox


glasser edited a comment on pull request #11020:
URL: https://github.com/apache/kafka/pull/11020#issuecomment-879597354


   Not sure why I was tagged — did you mean somebody else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] glasser commented on pull request #11020: KAFKA-12937; mm2 can start from the ending of a topic

2021-07-13 Thread GitBox


glasser commented on pull request #11020:
URL: https://github.com/apache/kafka/pull/11020#issuecomment-879597354


   Not sure why I was tagged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley merged pull request #11006: KAFKA-13049: Name the threads used for log recovery

2021-07-13 Thread GitBox


tombentley merged pull request #11006:
URL: https://github.com/apache/kafka/pull/11006


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-13 Thread GitBox


mjsax commented on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-879596929


   > There could be data loss, because locally the windowed state store would 
store records for a longer period of time than in the changelog topic. If a 
Kafka Streams client is restarted with wiped out state it might restore less 
records into the state store than the state store had before the restart. In 
other words, if the Kafka Streams state store were not restarted, it would have 
more records than after the restart. This might happen because records that are 
within the larger retention time of the windowed store (i.e. the segments) 
might be outside the shorter retention time of the changelog topic, hence those 
records might have already been removed from the changelog topic before 
restoration starts.
   
   Yes, but the point is that if the _guaranteed_ retention time is T and T is 
applied to the changelog, the fact that T+X is applied to the state-store does 
not mean we _lose_ the data for this case, because we only guaranteed to hold 
data up to T anyway, and this guarantee is met. In the end, the changelog topic 
is the source of truth, not the state store.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #11006: KAFKA-13049: Name the threads used for log recovery

2021-07-13 Thread GitBox


tombentley commented on pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#issuecomment-879596800


   Test failures are unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

2021-07-13 Thread GitBox


kkonstantine commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r669274974



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -367,42 +369,53 @@ private void doCommit(Map offsets, boolean cl
 }
 
 private void commitOffsets(long now, boolean closing) {
+commitOffsets(now, closing, consumer.assignment());
+}
+
+private void commitOffsets(long now, boolean closing, 
Collection topicPartitions) {
 if (workerErrantRecordReporter != null) {
-log.trace("Awaiting all reported errors to be completed");
-workerErrantRecordReporter.awaitAllFutures();
-log.trace("Completed all reported errors");
+log.trace("Awaiting reported errors for {} to be completed", 
topicPartitions);

Review comment:
   I wonder if we want to print the actual list of partitions here, which 
might be long. And do it twice. 
   I see the same pattern is applied elsewhere. I understand the value of 
explicit listing. 
   

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -412,32 +425,36 @@ private void commitOffsets(long now, boolean closing) {
 return;
 }
 
-final Map commitableOffsets = new 
HashMap<>(lastCommittedOffsets);
+Collection allAssignedTopicPartitions = 
consumer.assignment();
+final Map committableOffsets = new 
HashMap<>(lastCommittedOffsets);
 for (Map.Entry 
taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
 final TopicPartition partition = taskProvidedOffsetEntry.getKey();
 final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
-if (commitableOffsets.containsKey(partition)) {
+if (committableOffsets.containsKey(partition)) {
 long taskOffset = taskProvidedOffset.offset();
-long currentOffset = currentOffsets.get(partition).offset();
+long currentOffset = offsetsToCommit.get(partition).offset();
 if (taskOffset <= currentOffset) {
-commitableOffsets.put(partition, taskProvidedOffset);
+committableOffsets.put(partition, taskProvidedOffset);
 } else {
 log.warn("{} Ignoring invalid task provided offset {}/{} 
-- not yet consumed, taskOffset={} currentOffset={}",
-this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
+this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
 }
-} else {
+} else if (!allAssignedTopicPartitions.contains(partition)) {
 log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
-this, partition, taskProvidedOffset, 
consumer.assignment());
+this, partition, taskProvidedOffset, 
allAssignedTopicPartitions);
+} else {
+log.debug("{} Ignoring task provided offset {}/{} -- topic 
partition not requested, requested={}",

Review comment:
   what's a requested topic partition?
   Also, above we mention just `partition`

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -710,22 +758,35 @@ else if (!context.pausedPartitions().isEmpty())
 
 @Override
 public void onPartitionsRevoked(Collection partitions) 
{
+onPartitionsRemoved(partitions, false);
+}
+
+@Override
+public void onPartitionsLost(Collection partitions) {
+onPartitionsRemoved(partitions, true);
+}
+
+private void onPartitionsRemoved(Collection 
partitions, boolean lost) {
 if (taskStopped) {
 log.trace("Skipping partition revocation callback as task has 
already been stopped");
 return;
 }
-log.debug("{} Partitions revoked", WorkerSinkTask.this);
+log.debug("{} Partitions {}: {}", WorkerSinkTask.this, lost ? 
"lost" : "revoked", partitions);
+
+if (partitions.isEmpty())
+return;
+
 try {
-closePartitions();
-sinkTaskMetricsGroup.clearOffsets();
+closePartitions(partitions, lost);
+sinkTaskMetricsGroup.clearOffsets(partitions);
 } catch (RuntimeException e) {
 // The consumer swallows exceptions raised in the rebalance 
listener, so we need to store
 // exceptions and rethrow when poll() returns.
 rebalanceException = e;
 }
 
-// Make sure we don't have any leftover data since offsets will be 
reset to committed positions
-

[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

2021-07-13 Thread GitBox


mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669293544



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -108,8 +144,10 @@ Overviewinit() method.
 
 Attention
-Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
-If at least one partition does not have any new data 
available, stream-time will not be advanced and thus punctuate() will not be triggered if 
PunctuationType.STREAM_TIME was specified.
+Stream-time is only advanced when Streams 
processes records.
+  If there are no records to process, or if Streams is waiting 
for new records
+  due to the Task Idling
+  configuration, then the stream time will not advance and 
punctuate() will 
not be triggered if PunctuationType.STREAM_TIME was specified.

Review comment:
   I guess we use `stream-time` as a noun throughout the documentation... 
But anyway. Not a big deal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

2021-07-13 Thread GitBox


mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669292416



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a 
single
   Processor object by calling
   init() on it again after close().
-When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-  (1) If #forward() is called within #process() the output record inherits 
the input record timestamp.
-  (2) If #forward() is called within punctuate() the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-  Note, that #forward() also allows to change the default behavior 
by passing a custom timestamp for the output record.
-Specifically, ProcessorContext#schedule() accepts a user Punctuator callback 
interface, which triggers its punctuate()
-API method periodically based on the PunctuationType. The PunctuationType 
determines what notion of time is used
+  
+The Processor interface takes two sets of generic 
parameters:
+KIn, VIn, KOut, 
VOut. These define the input and output types
+that the processor implementation can handle. KIn and
+VIn 
define the key and value types that will be passed
+to process().
+Likewise, KOut and VOut
+define the forwarded key and value types that ProcessorContext#forward()
+will accept. If your processor does not forward any records at all 
(or if it only forwards
+null keys or values),
+a best practice is to set the output generic type argument to
+Void.
+If it needs to forward multiple types that don't share a common 
superclass, you will
+have to set the output generic type argument to Object.
+  
+  
+Both the Processor#process()
+and the ProcessorContext#forward()
+methods handle precords in the form of the RecordK, V
+data class. This class gives you access to the key components of a 
Kafka record:
+the key, value, timestamp and headers. When forwarding records, 
you can use the
+constructor to create a new Record
+from scratch, or you can use the convenience builder methods to 
replace one of the
+Record's properties
+and copy over the rest. For example,
+inputRecord.withValue(newValue)
+would copy the key, timestamp, and headers from
+inputRecord while
+setting the output record's value to newValue.
+Note that this does not mutate inputRecord,
+but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
+plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
+create a deep copy of those fields yourself.
+  
+
+  In addition to handling incoming records via
+  Processor#process(),
+  you have the option to schedule periodic invocation (called 
"punctuation")
+  in your processor's init()

Review comment:
   Don't have a strong opinion -- but it's actually quite useful -- for 
example, you could register a punctuation for a specific input record (that you 
buffer in a state store) to emit it later on.
   
   Atm it sound like that you can only register a punctuation within `init()` 
what seems wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13082) Replace EasyMock with Mockito for ProcessorContextTest

2021-07-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13082:

Component/s: unit tests
 streams

> Replace EasyMock with Mockito for ProcessorContextTest
> --
>
> Key: KAFKA-13082
> URL: https://issues.apache.org/jira/browse/KAFKA-13082
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Chun-Hao Tang
>Assignee: Chun-Hao Tang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante opened a new pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-07-13 Thread GitBox


C0urante opened a new pull request #11046:
URL: https://github.com/apache/kafka/pull/11046


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12980)
   
   This is useful for reads to the end of a topic that contains aborted 
transactions. If an aborted transaction is at the end of the topic, the 
consumer can now be expected to return from `poll` if it advances past that 
aborted transaction, and users can query the consumer's latest `position` for 
the relevant topic partitions to see if it has managed to make it past the end 
of the topic (or rather, what was the end of the topic when the attempt to read 
to the end of the topic began).
   
   For a concrete example of this logic, see the 
[`KafkaBasedLog::readToLogEnd`](https://github.com/apache/kafka/blob/5e5d5bff3bdaf807338ec9adeac982f8a5c98fbd/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L322-L345)
 method that Connect employs to refresh its view of internal topics.
   
   No new unit tests are added, but many existing unit tests are modified to 
ensure that aborted transactions are detected and reported correctly by 
`Fetcher::collectFetch`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming edited a comment on pull request #11044: MINOR: Fully specify CompletableFuture type

2021-07-13 Thread GitBox


dengziming edited a comment on pull request #11044:
URL: https://github.com/apache/kafka/pull/11044#issuecomment-879579000


   This seems the same with #11043 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11044: MINOR: Fully specify CompletableFuture type

2021-07-13 Thread GitBox


dengziming commented on pull request #11044:
URL: https://github.com/apache/kafka/pull/11044#issuecomment-879579000


   This seems the same with #11044


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13037) "Thread state is already PENDING_SHUTDOWN" log spam

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380295#comment-17380295
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13037:


Thanks for the fix – btw, I added you as a contributor so you should be able to 
self-assign tickets from now on. 

> "Thread state is already PENDING_SHUTDOWN" log spam
> ---
>
> Key: KAFKA-13037
> URL: https://issues.apache.org/jira/browse/KAFKA-13037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: John Gray
>Assignee: John Gray
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> KAFKA-12462 introduced a 
> [change|https://github.com/apache/kafka/commit/4fe4cdc4a61cbac8e070a8b5514403235194015b#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R722]
>  that increased this "Thread state is already {}" logger to info instead of 
> debug. We are running into a problem with our streams apps when they hit an 
> unrecoverable exception that shuts down the streams thread, we get this log 
> printed about 50,000 times per second per thread. I am guessing it is once 
> per record we have queued up when the exception happens? We have temporarily 
> raised the StreamThread logger to WARN instead of INFO to avoid the spam, but 
> we do miss the other good logs we get on INFO in that class. Could this log 
> be reverted back to debug? Thank you! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13037) "Thread state is already PENDING_SHUTDOWN" log spam

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-13037:
--

Assignee: John Gray

> "Thread state is already PENDING_SHUTDOWN" log spam
> ---
>
> Key: KAFKA-13037
> URL: https://issues.apache.org/jira/browse/KAFKA-13037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: John Gray
>Assignee: John Gray
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> KAFKA-12462 introduced a 
> [change|https://github.com/apache/kafka/commit/4fe4cdc4a61cbac8e070a8b5514403235194015b#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R722]
>  that increased this "Thread state is already {}" logger to info instead of 
> debug. We are running into a problem with our streams apps when they hit an 
> unrecoverable exception that shuts down the streams thread, we get this log 
> printed about 50,000 times per second per thread. I am guessing it is once 
> per record we have queued up when the exception happens? We have temporarily 
> raised the StreamThread logger to WARN instead of INFO to avoid the spam, but 
> we do miss the other good logs we get on INFO in that class. Could this log 
> be reverted back to debug? Thank you! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10993: KAFKA-13037: "Thread state is already PENDING_SHUTDOWN" log spam

2021-07-13 Thread GitBox


ableegoldman commented on pull request #10993:
URL: https://github.com/apache/kafka/pull/10993#issuecomment-879569865


   Merged to trunk and cherrypicked to 2.8 and 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10993: KAFKA-13037: "Thread state is already PENDING_SHUTDOWN" log spam

2021-07-13 Thread GitBox


ableegoldman merged pull request #10993:
URL: https://github.com/apache/kafka/pull/10993


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #11045: KAFKA-13082: Replace EasyMock with Mockito for ProcessorContextTest

2021-07-13 Thread GitBox


tang7526 opened a new pull request #11045:
URL: https://github.com/apache/kafka/pull/11045


   https://issues.apache.org/jira/browse/KAFKA-13082
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13082) Replace EasyMock with Mockito for ProcessorContextTest

2021-07-13 Thread Chun-Hao Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chun-Hao Tang updated KAFKA-13082:
--
Summary: Replace EasyMock with Mockito for ProcessorContextTest  (was: 
Replace EasyMock for ProcessorContextTest)

> Replace EasyMock with Mockito for ProcessorContextTest
> --
>
> Key: KAFKA-13082
> URL: https://issues.apache.org/jira/browse/KAFKA-13082
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chun-Hao Tang
>Assignee: Chun-Hao Tang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13082) Replace EasyMock for ProcessorContextTest

2021-07-13 Thread Chun-Hao Tang (Jira)
Chun-Hao Tang created KAFKA-13082:
-

 Summary: Replace EasyMock for ProcessorContextTest
 Key: KAFKA-13082
 URL: https://issues.apache.org/jira/browse/KAFKA-13082
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chun-Hao Tang
Assignee: Chun-Hao Tang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio opened a new pull request #11044: MINOR: Fully specify CompletableFuture type

2021-07-13 Thread GitBox


jsancio opened a new pull request #11044:
URL: https://github.com/apache/kafka/pull/11044


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11043: MINOR: fix Scala 2.12 compile failure in ControllerApisTest

2021-07-13 Thread GitBox


cmccabe opened a new pull request #11043:
URL: https://github.com/apache/kafka/pull/11043


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13064) refactor ListConsumerGroupOffsetsHandler

2021-07-13 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13064:
--
Priority: Blocker  (was: Major)

> refactor ListConsumerGroupOffsetsHandler
> 
>
> Key: KAFKA-13064
> URL: https://issues.apache.org/jira/browse/KAFKA-13064
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13064) refactor ListConsumerGroupOffsetsHandler

2021-07-13 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13064:
--
Fix Version/s: 3.0.0

> refactor ListConsumerGroupOffsetsHandler
> 
>
> Key: KAFKA-13064
> URL: https://issues.apache.org/jira/browse/KAFKA-13064
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() {
 }
 }
 
-public synchronized Pattern earliestResetTopicsPattern() {
-return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
+public boolean hasOffsetResetOverrides() {
+return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
+&& latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
 }
 
-public synchronized Pattern latestResetTopicsPattern() {
-return resetTopicsPattern(latestResetTopics, latestResetPatterns);
-}
-
-private Pattern resetTopicsPattern(final Set resetTopics,
-   final Set resetPatterns) {
-final List topics = 
maybeDecorateInternalSourceTopics(resetTopics);
-
-return buildPattern(topics, resetPatterns);
-}
-
-private static Pattern buildPattern(final Collection sourceTopics,
-final Collection 
sourcePatterns) {
-final StringBuilder builder = new StringBuilder();
-
-for (final String topic : sourceTopics) {
-builder.append(topic).append("|");
-}
-
-for (final Pattern sourcePattern : sourcePatterns) {
-builder.append(sourcePattern.pattern()).append("|");
-}
-
-if (builder.length() > 0) {
-builder.setLength(builder.length() - 1);
-return Pattern.compile(builder.toString());
+public OffsetResetStrategy offsetResetStrategy(final String topic) {
+if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return EARLIEST;
+} else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
+latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return LATEST;
+} else if 
(maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic)

Review comment:
   The `NONE` case means we do have this topic in this 
InternalTopologyBuilder (as opposed to that of a different NamedTopology) but 
it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the 
first two `if` conditions above, then all that's left is to verify whether or 
not we have this topic at all -- which is going to be true if we find it in 
either the source topic set or pattern.
   
   Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically 
if we don't have any NamedTopologies then there is only one 
InternalTopologyBuilder, so all topics should belong to it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669250045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro
 return Collections.unmodifiableMap(globalStateStores);
 }
 
-public Set allStateStoreName() {
+public Set allStateStoreNames() {
 Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
 final Set allNames = new HashSet<>(stateFactories.keySet());
 allNames.addAll(globalStateStores.keySet());
 return Collections.unmodifiableSet(allNames);
 }
 
+public boolean hasStore(final String name) {
+return stateFactories.containsKey(name) || 
globalStateStores.containsKey(name);
+}
+
+public boolean hasPersistentStores() {

Review comment:
   Previously we would get a handle on the actual topology and then it 
would have to iterate through all the stores to check each one for persistence. 
But while you can now add and remove individual named topologies, you still 
can't change a topology or the stores in it while the app is running, so we may 
as well just keep track of whether we found any persistent stores or not as we 
go along, rather than iterate over all of them later. Also, this way we can 
keep and access this metadata easily through the 
TopologyMetadata/InternalTopologyBuilder, rather than ever having to go access 
the ProcessorTopology directly at all
   
   That said, I'm not _too_ attached to this way of doing things, so if you 
have concerns I can go back to something like how it was before. Just lmk what 
you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669233192



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {

Review comment:
   I tried to, but just couldn't make it work. It has to do with Java and 
subclassing quirks like constructing the parent before the child. It seems to 
be pretty much impossible to set things up so that everything is `final` -- if 
we set the `topologyName` in the NamedTopology constructor, then it's not 
accessible (ie always null) when we call the `InternalTopologyBuilder`'s 
constructor since that occurs during the parent `Topology`'s construction.
   
   It's definitely annoying, but at least we should be able to clean things up 
once we go through a KIP and don't need to subclass like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-13 Thread GitBox


kkonstantine commented on pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#issuecomment-879532122


   The idea that the `DelegatingClassLoader` did not have to be parallel 
capable originated to the fact that it doesn't load classes directly. It 
delegates loading either to the appropriate PluginClassLoader directly via 
composition, or to the parent by calling `super.loadClass`. 
   
   The latter is the key point of why we need to make the 
`DelegatingClassLoader` also parallel capable even though it doesn't load a 
class. Because inheritance is used (via a call to `super.loadClass`) and not 
composition (via a hypothetical call to `parent.loadClass`, which is not 
possible because `parent` is a private member of the base abstract class 
`ClassLoader`) when `getClassLoadingLock` is called in `super.loadClass` it 
checks that actually the derived class (here an instance of 
`DelegatingClassLoader`) is not parallel capable and therefore ends up not 
applying fine-grain locking during classloading even though the parent 
clasloader is used actually load the classes. 
   
   Based on the above, I added a last commit to mark the 
`DelegatingClassLoader` as parallel capable. 
   
   I've tested both classloader types being parallel capable in a variety of 
scenarios with multiple connectors, SMTs and converters and a deadlock did not 
reproduce. Of course reproducing the issue is difficult without the specifics 
of the jar layout to begin with. The possibility of a deadlock is still not 
zero, but also probably not exacerbated compared to the current code. The 
plugin that depends on other plugins to be loaded while it's loading its 
classes is the connector type plugin only and there are no inter-connector 
dependencies (a connector requiring another connector's classes to be loaded 
while loading its own). With that in mind, a deadlock should be even less 
possible now. In the future we could consider introducing deadlock recovery 
methods to get out of this type of situation if necessary. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380255#comment-17380255
 ] 

Luke Chen commented on KAFKA-13081:
---

> I would prefer to just apply the minimal required fixes and leave 2.8 as a 
> "safe" branch

Agree!

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman edited a comment on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


ableegoldman edited a comment on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879525127






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13081:
---
Component/s: consumer

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380251#comment-17380251
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13081:


Personally I would probably advocate for just applying the two fixes to 2.8 
rather than porting over the full set of improvements and refactorings. While 
the improvements are great to have, they're not strictly necessary and 
relatively complex, so porting them back carries some risk in case of 
as-yet-undiscovered bugs that were introduced during the large-scale 
refactoring of this assignor. Given that it's hard to be 100% confident in the 
correctness of such a complicated algorithm, regardless of how good the test 
coverage is, I would prefer to just apply the minimal required fixes and leave 
2.8 as a "safe" branch. That way we can fall back to it in case of unexpected 
issues in the 3.0 assignor, and recommend users who want to downgrade to the 
last stable version with a good "cooperative-sticky"/"sticky" assignor they can 
continue to use until the new assignor is stabilized.

Just my 2 cents

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


ableegoldman commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879525127


   @showuon  filed https://issues.apache.org/jira/browse/KAFKA-13081 just fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


showuon commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879523852


   Let me handle the porting to v2.8. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-13081:
-

Assignee: Luke Chen

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13081:
---
Description: 
We should make sure that fix #1 and #2 of 
[#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
sticky assignor, since it's pretty much impossible to smoothly cherrypick that 
commit from 3.0 to 2.8 due to all the recent improvements and refactoring in 
the AbstractStickyAssignor. Either we can just extract and apply those two 
fixes to 2.8 directly, or go back and port all the commits that made this 
cherrypick difficult over to 2.8 as well. If we do so then cherrypicking the 
original commit should be easy.

See [this 
comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]

  was:We should make sure that fix #1 and #2 of 
[#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
sticky assignor, since it's pretty much impossible to smoothly cherrypick that 
commit from 3.0 to 2.8 due to all the recent improvements and refactoring in 
the AbstractStickyAssignor. Either we can just extract and apply those two 
fixes to 2.8 directly, or go back and port all the commits that made this 
cherrypick difficult over to 2.8 as well. If we do so then cherrypicking the 
original commit should be easy


> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13081:
--

 Summary: Port sticky assignor fixes (KAFKA-12984) back to 2.8
 Key: KAFKA-13081
 URL: https://issues.apache.org/jira/browse/KAFKA-13081
 Project: Kafka
  Issue Type: Bug
Reporter: A. Sophie Blee-Goldman
 Fix For: 2.8.1


We should make sure that fix #1 and #2 of 
[#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
sticky assignor, since it's pretty much impossible to smoothly cherrypick that 
commit from 3.0 to 2.8 due to all the recent improvements and refactoring in 
the AbstractStickyAssignor. Either we can just extract and apply those two 
fixes to 2.8 directly, or go back and port all the commits that made this 
cherrypick difficult over to 2.8 as well. If we do so then cherrypicking the 
original commit should be easy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


ableegoldman commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879521196


   Merged to trunk and cherrypicked back to 3.0 (cc @kkonstantine)
   
   Unfortunately it's not really possible to cherrypick this back to 2.8, since 
there have been so many changes to the assignor since then. However the main 
conflicts are due to the fix #3, which isn't actually necessary as the bug it's 
fixing was only present in 3.0. So we just need to extract the #1 and #2 fixes 
and apply those to the 2.8 assignor (along with the tests, most if not all of 
which are still relevant in 2.8).
   
   We should split this out into a separate ticket though, so we can close the 
current one and unblock the 3.0 release. @showuon If you have time would you be 
interested in picking this up? I guess an alternative is to just port those 
improvements you made to the assignor back to 2.8 and then just cherrypick this 
fix as usual, which should be relatively smooth since I doubt much else has 
touched the assignor recently. I'll leave it up to you to decide which route to 
go, if you want to take this one. Luckily there's no rush on this since the 
2.8.1 release isn't right around the corner like the 3.0 release is, so we can 
take our time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12984:
---
Fix Version/s: (was: 2.8.1)

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669215608



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   I also added a unit test for `electLeaders`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


ableegoldman merged pull request #10985:
URL: https://github.com/apache/kafka/pull/10985


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


ableegoldman commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879510987


   Some unrelated flaky test failures: 
`RaftEventSimulationTest.canRecoverAfterAllNodesKilled` and 
`ConsumerBounceTest.testCloseDuringRebalance()` (known to be flaky, has been 
failing often lately)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   @guozhangwang  WDYT? If the user has started up Streams with several 
named topologies, but a subset of them are completely empty, should this be 
considered user error and cause Streams to shutdown or should we just roll with 
it as long as at least one topology is non-empty?
   
   Take a look at the current state and lmk what you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   @guozhangwang  WDYT? If the user has started up Streams with several 
named topologies, but a subset of them are completely empty, should this be 
considered user error and cause Streams to shutdown or should we just roll with 
it as long as at least one topology is non-empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669210507



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   Sure, but if `builders.isEmpty` then we would enter the `if` block above 
and return before reaching this section of the code. But I think maybe you 
meant that in `hasNoNonGlobalTopology`, we should actually return true only if 
_all_ builders have no non-global topology, not if that's true for _any one_ of 
them? There's some argument to be made for how to handle the case where some 
named topologies are legit, while others are empty, but I would still advocate 
for throwing an exception when _any_ topology is empty since this is not a 
valid configuration. In which case, the current code is correct, but the 
comment is not. I'll fix the misleading comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669208386



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   One weird side effect of doing this is that it can change the 
replication factor for a partition to be different from the original one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12554) Split Log layer into Log and LocalLog

2021-07-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12554.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

merged the PR to trunk.

> Split Log layer into Log and LocalLog
> -
>
> Key: KAFKA-12554
> URL: https://issues.apache.org/jira/browse/KAFKA-12554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
> Fix For: 3.1.0
>
>
> Split Log layer into Log and LocalLog based on the proposal described in this 
> document: 
> [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10280: KAFKA-12554: Refactor Log layer

2021-07-13 Thread GitBox


junrao merged pull request #10280:
URL: https://github.com/apache/kafka/pull/10280


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669203100



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   It's not possible to revert to the original replicas in all cases, 
though. Specifically, if none of the original replicas are in the ISR, you 
can't do such a revert without data loss. If the old controller is doing that 
unconditionally, it seems like a bug.
   
   I suppose we could also fail the revert operation, but that doesn't seem 
like it would be a useful behavior from the user's point of view




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669201808



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   Good catch. This is supposed to be set when we're handling the 
`electLeaders` RPC, since in that case you want to move to the preferred leader 
if it's not on that leader already. Should be fixed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669197330



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {

Review comment:
   Also, if a user has a global-only topology, then it's absolutely valid 
for them to eplicitly configure the app to have no StreamThreads. In fact if we 
detect that case then we actually override the configured num.stream.threads to 
0 for them, anyways




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196718



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+
+// Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+return 0;
+}
+
+return configuredNumStreamThreads;
+}
+
+public boolean hasNamedTopologies() {
+// This includes the case of starting up with no named topologies at 
all
+return !builders.containsKey(UNNAMED_TOPOLOGY);
+}
+
+public boolean hasGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+}
+
+public boolean hasNoNonGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+}
+
+public boolean hasPersistentStores() {
+// If the app is using named topologies, there may not be any 
persistent state when it first starts up
+// but a new NamedTopology may introduce it later, so we must return 
true
+if (hasNamedTopologies()) {
+return true;
+}
+return 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196246



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+
+// Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+return 0;
+}
+
+return configuredNumStreamThreads;
+}
+
+public boolean hasNamedTopologies() {
+// This includes the case of starting up with no named topologies at 
all
+return !builders.containsKey(UNNAMED_TOPOLOGY);
+}
+
+public boolean hasGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+}
+
+public boolean hasNoNonGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+}
+
+public boolean hasPersistentStores() {
+// If the app is using named topologies, there may not be any 
persistent state when it first starts up
+// but a new NamedTopology may introduce it later, so we must return 
true
+if (hasNamedTopologies()) {
+return true;
+}
+return 

[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-07-13 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-878641126


   @junrao Thanks for the review. I ran load tests on the changes from this PR, 
there weren't any new regressions (i.e. latency regressions or errors) that I 
noticed, except for an issue that I found which looks unrelated to this PR, its 
described in this jira: https://issues.apache.org/jira/browse/KAFKA-13070.
   
   The load test was run on a 6-broker cluster with 250GB SSD disks:
* Produce consume on a test topic 2000 partitions (~1000+ replica count per 
broker).
* Per topic # of producers = 6.
* Produce ingress per broker = ~20.5MBps.
* Per topic # of consumers = 6.
* \# of consumer groups = 3.
* Test duration: ~1h.
   
   Mid-way through the test, I rolled the cluster under load to check how the 
cluster behaved. Overall things looked OK.
   
   There weren't any additional tests that I was planning to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669193117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -364,6 +371,10 @@ public synchronized final StreamsConfig getStreamsConfig() 
{
 return config;
 }
 
+public String namedTopology() {

Review comment:
   You mean to rename this to `topologyName()`? Ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669192999



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {
+Objects.requireNonNull(namedTopology, "named topology can't be null");
+if (this.namedTopology != null) {
+log.error("Tried to reset the namedTopology to {} but it was 
already set to {}", namedTopology, this.namedTopology);
+throw new IllegalStateException("NamedTopology has already been 
set to " + this.namedTopology);
+}
+this.namedTopology = namedTopology;
+}
+
 // public for testing only
-public synchronized final InternalTopologyBuilder setApplicationId(final 
String applicationId) {

Review comment:
   I looked around and can't imagine why we would ever need it. 
`setApplicationId` should only ever be called once, from a single 
location/thread




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669191369



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
 if (numStreamThreads == 0) {
 return totalCacheSize;
 }
-return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));

Review comment:
   We're just trying to get the basic NamedTopology feature up and running 
for the time being, so we'll loop back around on any "missing" features 
eventually. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669190141



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
 if (numStreamThreads == 0) {
 return totalCacheSize;
 }
-return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));

Review comment:
   At the moment we just don't allow global stores with named topologies. 
There is a list of not-yet-supported features that are currently incompatible 
with them in the javadocs of KafkaStreamsNamedTopologyWrapper




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669188996



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final StreamsConfig config,
 final Time time) {
-this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), time);
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, new DefaultKafkaClientSupplier(), time);
 }
 
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
- final StreamsConfig config,
- final KafkaClientSupplier clientSupplier) throws 
StreamsException {
-this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
-}
-
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+private KafkaStreams(final Topology topology,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time) throws StreamsException {
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, clientSupplier, time);
+}
+
+protected KafkaStreams(final TopologyMetadata topologyMetadata,

Review comment:
   Just this one does, as it's called from the child class 
KafkaStreamsNamedTopologyWrapper. I'll change the one below back to private 
though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11032: KAFKA-13073: Inconsistent MockLog implementation

2021-07-13 Thread GitBox


cmccabe merged pull request #11032:
URL: https://github.com/apache/kafka/pull/11032


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr opened a new pull request #11042: KAFKA-12886: Enable request forwarding to the controller by default for zookeeper mutation protocols

2021-07-13 Thread GitBox


dielhennr opened a new pull request #11042:
URL: https://github.com/apache/kafka/pull/11042


   Enabled request forwarding by default and addressed test failures.
   
   The following tests are still flaky:
   
   * DynamicBrokerReconfigurationTest > testTrustStoreAlter
   * SaslSslAdminIntegrationTest > testAclDelete
   * SaslSslAdminIntegrationTest > testLegacyAclOpsNeverAffectOrReturnPrefixed
   * AlterUserScramCredentialsRequestTest > testAlterNotController
   
   * SaslPlainSslEndToEndAuthorizationTest
   * PlaintextEndToEndAuthorizationTest
   * GroupEndToEndAuthorizationTest
   * SslEndToEndAuthorizationTest
>testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
   
   https://issues.apache.org/jira/browse/KAFKA-12886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11031: KAFKA-13067 Add internal config to lower the metadata log segment size

2021-07-13 Thread GitBox


cmccabe merged pull request #11031:
URL: https://github.com/apache/kafka/pull/11031


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11040: KAFKA-13078: Closing FileRawSnapshotWriter too early

2021-07-13 Thread GitBox


cmccabe merged pull request #11040:
URL: https://github.com/apache/kafka/pull/11040


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11041: KAFKA-13080: Direct fetch snapshot request to kraft

2021-07-13 Thread GitBox


cmccabe merged pull request #11041:
URL: https://github.com/apache/kafka/pull/11041


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669155613



##
File path: metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
##
@@ -88,9 +90,34 @@ public void testCopyWithout() {
 assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 
2, 2, 1}, 2));
 }
 
+@Test
+public void testCopyWithout2() {
+assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new 
int[] {}));
+assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, 
new int[] {1}));
+assertArrayEquals(new int[] {1, 3},
+Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4}));
+assertArrayEquals(new int[] {4},
+Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1}));
+}
+
 @Test
 public void testCopyWith() {
 assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1));
 assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] 
{1, 2, 3}, 4));
 }
+
+@Test
+public void testToSet() {
+assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {}));
+assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)),

Review comment:
   Could we add a test where the input has duplicates?

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##
@@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() 
throws Exception {
 OptionalInt.of(3))).getMessage());
 }
 
+private final static ListPartitionReassignmentsResponseData 
NONE_REASSIGNING =
+new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+
 @Test
-public void testBestLeader() {
-assertEquals(2, ReplicationControlManager.bestLeader(
-new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
-assertEquals(3, ReplicationControlManager.bestLeader(
-new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
-assertEquals(4, ReplicationControlManager.bestLeader(
-new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
-assertEquals(-1, ReplicationControlManager.bestLeader(
-new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
-assertEquals(4, ReplicationControlManager.bestLeader(
-new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+public void testReassignPartitions() throws Exception {
+ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+ReplicationControlManager replication = ctx.replicationControl;
+ctx.registerBrokers(0, 1, 2, 3);
+ctx.unfenceBrokers(0, 1, 2, 3);
+Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
+ctx.createTestTopic("bar", new int[][] {
+new int[] {1, 2, 3}}).topicId();
+assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+ControllerResult alterResult =
+replication.alterPartitionReassignments(
+new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+new ReassignablePartition().setPartitionIndex(0).
+setReplicas(Arrays.asList(3, 2, 1)),
+new ReassignablePartition().setPartitionIndex(1).
+setReplicas(Arrays.asList(0, 2, 1)),
+new ReassignablePartition().setPartitionIndex(2).
+setReplicas(Arrays.asList(0, 2, 1,
+new ReassignableTopic().setName("bar";
+assertEquals(new AlterPartitionReassignmentsResponseData().
+setErrorMessage(null).setResponses(Arrays.asList(
+new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+new 
ReassignablePartitionResponse().setPartitionIndex(0).
+setErrorMessage(null),
+new 
ReassignablePartitionResponse().setPartitionIndex(1).
+setErrorMessage(null),
+new 
ReassignablePartitionResponse().setPartitionIndex(2).
+setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+setErrorMessage("Unable to find partition 
foo:2."))),
+new ReassignableTopicResponse().
+setName("bar"))),
+alterResult.response());
+ctx.replay(alterResult.records());
+ListPartitionReassignmentsResponseData currentReassigning =
+new ListPartitionReassignmentsResponseData().setErrorMessage(null).

[GitHub] [kafka] guozhangwang commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-13 Thread GitBox


guozhangwang commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-879458156


   LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-13 Thread GitBox


guozhangwang commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r669163465



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   SGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r669157613



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   Thanks @showuon and @guozhangwang , I think that all makes sense. One of 
my primary motivations was to keep all data structures at all times consistent 
with what they represent so they could always be relied upon to be used at any 
point. For that reason I also prefer to keep things as is, and clear the 
`potentiallyUnfilledMembersAtMinQuota` (now renamed to 
`nfilledMembersWithExactlyMinQuotaPartitions`) as soon as we have filled the 
last member who may be above `minQuota`, at which point all of the members at 
exactly `minQuota` are no longer considered "unfilled"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380212#comment-17380212
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-13010 at 7/13/21, 10:39 PM:
---

Failed again 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10985/6/testReport/junit/org.apache.kafka.streams.integration/TaskMetadataIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldReportCorrectCommittedOffsetInformation/]

FWIW I didn't see the above log message about that subscribed topic not being 
assigned to any members. The logs were truncated so it's possible that it 
actually was there, but I don't think that's the case since AFAICT the 
truncated logs are mostly from kafka/zookeeper. The relevant logs from the 
rebalance seem to be present


was (Author: ableegoldman):
Failed again 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10985/6/testReport/junit/org.apache.kafka.streams.integration/TaskMetadataIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldReportCorrectCommittedOffsetInformation/

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380212#comment-17380212
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13010:


Failed again 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10985/6/testReport/junit/org.apache.kafka.streams.integration/TaskMetadataIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldReportCorrectCommittedOffsetInformation/

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio opened a new pull request #11041: KAFKA-13080: Direct fetch snapshot request to kraft

2021-07-13 Thread GitBox


jsancio opened a new pull request #11041:
URL: https://github.com/apache/kafka/pull/11041


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-13 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r668323818



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   Do we need this since it's only used in tests?

##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
##
@@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() {
 setIsNew(false).toString(),
 b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), 
false).toString());
 }
+
+@Test
+public void testMergePartitionChangeRecordWithReassignmentData() {
+PartitionRegistration partition0 = new PartitionRegistration(new int[] 
{1, 2, 3},
+new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+PartitionRegistration partition1 = partition0.merge(new 
PartitionChangeRecord().
+setRemovingReplicas(Collections.singletonList(3)).
+setAddingReplicas(Collections.singletonList(4)).
+setReplicas(Arrays.asList(1, 2, 3, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+new int[] 

[jira] [Created] (KAFKA-13080) Fetch snapshot request are not directed to kraft in controller

2021-07-13 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13080:
--

 Summary: Fetch snapshot request are not directed to kraft in 
controller
 Key: KAFKA-13080
 URL: https://issues.apache.org/jira/browse/KAFKA-13080
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


Kraft followers and observer are seeing the following error
{code:java}
[2021-07-13 18:15:47,289] ERROR [RaftManager nodeId=2] Unexpected error 
UNKNOWN_SERVER_ERROR in FETCH_SNAPSHOT response: 
InboundResponse(correlationId=29862, 
data=FetchSnapshotResponseData(throttleTimeMs=0, errorCode=-1, topics=[]), 
sourceId=3001) (org.apache.kafka.raft.KafkaRaftClient) {code}
This is because ControllerApis is not directing FetchSnapshost request to the 
raft manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-07-13 Thread GitBox


cadonna commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-879401529


   Cherry-picked to 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-2588) ReplicaManager partitionCount metric should actually be replicaCount

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2588:
--

Assignee: (was: Grant Henke)

> ReplicaManager partitionCount metric should actually be replicaCount
> 
>
> Key: KAFKA-2588
> URL: https://issues.apache.org/jira/browse/KAFKA-2588
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Grant Henke
>Priority: Major
>  Labels: needs-kip
>
> The metrics "partitionCount" in the ReplicaManager actually represents the 
> count of replicas. 
> As an example if I have a cluster with 1 topic with 1 partitions and a 
> replication factor of 3. The metric (aggregated) would show a value of 3. 
> There is a metric called "LeaderCount" that actually represents the 
> "partitionCount". In my example above the metric (aggregated) would show a 
> value of 1. 
> We do need to consider compatibility of consuming systems. I think the most 
> simple change would be to:
> - Adjust the "partitionCount" metric to be the same value as "LeaderCount"
> - Add a "replicaCount" metric which contains the values "partitionCount" does 
> today
> - Leave "LeaderCount" in for compatibility
> Documentation will need to be updated as well. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-1694) KIP-4: Command line and centralized operations

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-1694:
--

Assignee: (was: Grant Henke)

> KIP-4: Command line and centralized operations
> --
>
> Key: KAFKA-1694
> URL: https://issues.apache.org/jira/browse/KAFKA-1694
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Joe Stein
>Priority: Critical
> Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, 
> KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1694_2015-01-12_18:54:48.patch, 
> KAFKA-1694_2015-01-13_19:30:11.patch, KAFKA-1694_2015-01-14_15:42:12.patch, 
> KAFKA-1694_2015-01-14_18:07:39.patch, KAFKA-1694_2015-03-12_13:04:37.patch, 
> KAFKA-1772_1802_1775_1774_v2.patch
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2620) Introduce Scalariform

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2620:
--

Assignee: (was: Grant Henke)

> Introduce Scalariform
> -
>
> Key: KAFKA-2620
> URL: https://issues.apache.org/jira/browse/KAFKA-2620
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Grant Henke
>Priority: Major
>
> Many of our reviews include nit comments related to Scala style. Adding 
> [Scalariform|https://github.com/daniel-trinh/scalariform] allows us to 
> reformat the code based on configurable standards at build time, ensuring 
> uniform readability and a short review/commit cycle. 
> I expect this will have some discussion around the rules we would like to 
> include, and if we actually want to adopt this. I will submit a sample patch 
> to start the discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2787) Refactor gradle build

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2787:
--

Assignee: (was: Grant Henke)

> Refactor gradle build
> -
>
> Key: KAFKA-2787
> URL: https://issues.apache.org/jira/browse/KAFKA-2787
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Grant Henke
>Priority: Minor
>
> The build files are quite large with a lot of duplication and overlap. This 
> could lead to mistakes, reduce readability and functionality, and hinder 
> future changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13075) Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13075.

Fix Version/s: 3.1.0
   Resolution: Fixed

> Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest
> -
>
> Key: KAFKA-13075
> URL: https://issues.apache.org/jira/browse/KAFKA-13075
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Chun-Hao Tang
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 3.1.0
>
>
> Looks like we have two different test classes covering pretty much the same 
> thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
> test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
> likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
> follow the usual naming scheme for test classes. 
> We should consolidate these two into a single file, ideally retaining the 
> RocksDBStoreTest name since that conforms to the test naming pattern used 
> throughout Streams (and so this same thing doesn't happen again). It should 
> also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
> currently does so we continue to get the benefit of all the tests in there as 
> well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #11034: KAFKA-13075: Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest

2021-07-13 Thread GitBox


ableegoldman commented on pull request #11034:
URL: https://github.com/apache/kafka/pull/11034#issuecomment-879383997


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11034: KAFKA-13075: Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest

2021-07-13 Thread GitBox


ableegoldman merged pull request #11034:
URL: https://github.com/apache/kafka/pull/11034


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13075) Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13075:
---
Summary: Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest  (was: 
Consolidate RocksDBStore and RocksDBKeyValueStoreTest)

> Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest
> -
>
> Key: KAFKA-13075
> URL: https://issues.apache.org/jira/browse/KAFKA-13075
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Chun-Hao Tang
>Priority: Major
>  Labels: newbie, newbie++
>
> Looks like we have two different test classes covering pretty much the same 
> thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
> test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
> likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
> follow the usual naming scheme for test classes. 
> We should consolidate these two into a single file, ideally retaining the 
> RocksDBStoreTest name since that conforms to the test naming pattern used 
> throughout Streams (and so this same thing doesn't happen again). It should 
> also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
> currently does so we continue to get the benefit of all the tests in there as 
> well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13079) Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-07-13 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-13079:
--

Assignee: Justine Olshan

> Forgotten Topics in Fetch Requests may incorrectly use topic IDs
> 
>
> Key: KAFKA-13079
> URL: https://issues.apache.org/jira/browse/KAFKA-13079
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> In the new code for Fetch, we only check if the topics contained in the 
> session have IDs to decide whether to send a version < 13 (topic names) or 
> version 13+ (topic IDs) request. However, if we have an empty session that 
> previously did not use IDs, we will try to send a request to forget the 
> topics. Since all topics in the session (none) were not missing topic ids, we 
> will send a version 13 request. This request will have the Zero UUID and fail.
> The result is that we close the session and mark any partitions in it as 
> errored, but the message is confusing and the request is not correct. We 
> should somehow also track forgotten topics when deciding what version to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13079) Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-07-13 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-13079:
--

 Summary: Forgotten Topics in Fetch Requests may incorrectly use 
topic IDs
 Key: KAFKA-13079
 URL: https://issues.apache.org/jira/browse/KAFKA-13079
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0
Reporter: Justine Olshan


In the new code for Fetch, we only check if the topics contained in the session 
have IDs to decide whether to send a version < 13 (topic names) or version 13+ 
(topic IDs) request. However, if we have an empty session that previously did 
not use IDs, we will try to send a request to forget the topics. Since all 
topics in the session (none) were not missing topic ids, we will send a version 
13 request. This request will have the Zero UUID and fail.

The result is that we close the session and mark any partitions in it as 
errored, but the message is confusing and the request is not correct. We should 
somehow also track forgotten topics when deciding what version to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2021-07-13 Thread Moses Nakamura (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380148#comment-17380148
 ] 

Moses Nakamura commented on KAFKA-3539:
---

So I know what I want to do and I've been trying to put together a prototype in 
advance of updating the KIP document, but even relatively modest changes seem 
to break a lot of tests.  [~ewencp] are you an expert in KafkaProducer?  Would 
you mind taking some time to chat and give me advice on how to proceed?

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #11009: MINOR: update doc for default assignor change

2021-07-13 Thread GitBox


ableegoldman commented on pull request #11009:
URL: https://github.com/apache/kafka/pull/11009#issuecomment-879367860


   Merged to trunk and cherrypicked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-3603) Define HashCode and Equals methods for Schema, Field and Type

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3603:
--

Assignee: (was: Grant Henke)

> Define HashCode and Equals methods for Schema, Field and Type
> -
>
> Key: KAFKA-3603
> URL: https://issues.apache.org/jira/browse/KAFKA-3603
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Priority: Major
>
> We should consider implementing HashCode and Equals methods for Schema, Field 
> and Type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3405) Deduplicate and break out release tasks

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3405:
--

Assignee: (was: Grant Henke)

> Deduplicate and break out release tasks
> ---
>
> Key: KAFKA-3405
> URL: https://issues.apache.org/jira/browse/KAFKA-3405
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: Grant Henke
>Priority: Minor
>
> Tasks like copyDependent libs are repeated throughout the build. Other tasks 
> like releaseTarGz should be be moved out of the core module. 
> While refactoring this code other optimizations like ensuring sources and 
> javadoc jars are not included in the classpath should be done as well.
> If it makes sense, moving the release tasks to a separate gradle file is 
> preferred.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3437) We don't need sitedocs package for every scala version

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3437:
--

Assignee: (was: Grant Henke)

> We don't need sitedocs package for every scala version
> --
>
> Key: KAFKA-3437
> URL: https://issues.apache.org/jira/browse/KAFKA-3437
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Gwen Shapira
>Priority: Minor
>
> When running "./gradlew releaseTarGzAll - it generates a binary tarball for 
> every scala version we support (good!) and also sitedoc tarball for every 
> scala version we support (useless).
> Will be nice if we have a way to generate just one sitedoc tarball for our 
> release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3509) Provide an Authorizer interface using the Java client enumerator classes

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3509:
--

Assignee: (was: Grant Henke)

> Provide an Authorizer interface using the Java client enumerator classes
> 
>
> Key: KAFKA-3509
> URL: https://issues.apache.org/jira/browse/KAFKA-3509
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Priority: Major
>
> Provide an Authorizer interface using the new Java classes used by the ACL 
> requests/responses added as a part of KAFKA-3266. Deprecate the old one to 
> encourage transition.
> This may require a small KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3360) Add a protocol page/section to the official Kafka documentation

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3360:
--

Assignee: (was: Grant Henke)

> Add a protocol page/section to the official Kafka documentation
> ---
>
> Key: KAFKA-3360
> URL: https://issues.apache.org/jira/browse/KAFKA-3360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Priority: Major
>
> This is an umbrella jira to track adding a protocol page/section to the 
> official Kafka documentation. It lays out subtasks for initial content and 
> follow up improvements and fixes. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3203) Add UnknownCodecException and UnknownMagicByteException to error mapping

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3203:
--

Assignee: (was: Grant Henke)

> Add UnknownCodecException and UnknownMagicByteException to error mapping
> 
>
> Key: KAFKA-3203
> URL: https://issues.apache.org/jira/browse/KAFKA-3203
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Priority: Major
>
> Currently most of the exceptions to user have an error code. While 
> UnknownCodecException and UnknownMagicByteException can also be thrown to 
> client, broker does not have error mapping for them, so clients will only 
> receive UnknownServerException, which is vague.
> We should create those two exceptions in client package and add them to error 
> mapping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3031) Refactor KafkaApis to be optimal for o.a.k.c requests

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3031:
--

Assignee: (was: Grant Henke)

> Refactor KafkaApis to be optimal for o.a.k.c requests
> -
>
> Key: KAFKA-3031
> URL: https://issues.apache.org/jira/browse/KAFKA-3031
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Priority: Major
>
> The migration changes were meant to be a minimal transition. Once most of the 
> migration is complete though, we should be able to refactor KafkaApis to be 
> more efficient, reliable and readable based on the new Requests/Responses.
> An important part to handle here is the refactoring of how authorization 
> errors are handled generically as discussed in 
> https://github.com/apache/kafka/pull/196 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3507) Define standard exceptions for the Authorizer interface

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3507:
--

Assignee: (was: Grant Henke)

> Define standard exceptions for the Authorizer interface
> ---
>
> Key: KAFKA-3507
> URL: https://issues.apache.org/jira/browse/KAFKA-3507
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Priority: Major
>
> The Authorizer does not define an standard exceptions that can be used by an 
> implementer. This means that any exception thrown on the broker, as a part of 
> KAFKA-3266, can only be passed back to the client as an UnknownException(-1) 
> making error handling difficult. A set of standard exceptions covering most 
> foreseeable exceptions should be defined as a part of the interface and used 
> in the default SimpleAclAuthorizer. 
> This work will require a small KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3249) Consider implementing a blocking admin requests after the initial KIP-4 work is done

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3249:
--

Assignee: (was: Grant Henke)

> Consider implementing a blocking admin requests after the initial KIP-4 work 
> is done
> 
>
> Key: KAFKA-3249
> URL: https://issues.apache.org/jira/browse/KAFKA-3249
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Priority: Major
>
> We have decided to make all requests/responses asynchronous in KIP-4 but see 
> value in adding a blocking option for admin requests in the future. This 
> ticket is to track that future discussion/design/work.
> See the discussion here for further context: 
> https://github.com/apache/kafka/pull/626



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3604) Improve error messages when null is used with a non-nullable Type

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3604:
--

Assignee: (was: Grant Henke)

> Improve error messages when null is used with a non-nullable Type
> -
>
> Key: KAFKA-3604
> URL: https://issues.apache.org/jira/browse/KAFKA-3604
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Grant Henke
>Priority: Major
>
> Currently when a null is passed to a non-nullable type an unclear message is 
> provided in the exception. We should indicate that the issue was caused by a 
> null value. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3610) Improve TimeoutException message when a RecordBatch expires

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3610:
--

Assignee: (was: Grant Henke)

> Improve TimeoutException message when a RecordBatch expires
> ---
>
> Key: KAFKA-3610
> URL: https://issues.apache.org/jira/browse/KAFKA-3610
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Priority: Major
>
> Currently when a batch expires in _RecordBatch.maybeExpire_ a Timeout 
> exception is throw with the message "Batch Expired". Providing some 
> explanation and advice on configuration options to avoid or handle the 
> exception would help users. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2410:
--

Assignee: (was: Grant Henke)

> Implement "Auto Topic Creation" client side and remove support from Broker 
> side
> ---
>
> Key: KAFKA-2410
> URL: https://issues.apache.org/jira/browse/KAFKA-2410
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.8.2.1
>Reporter: Grant Henke
>Priority: Major
>
> Auto topic creation on the broker has caused pain in the past; And today it 
> still causes unusual error handling requirements on the client side, added 
> complexity in the broker, mixed responsibility of the TopicMetadataRequest, 
> and limits configuration of the option to be cluster wide. In the future 
> having it broker side will also make features such as authorization very 
> difficult. 
> There have been discussions in the past of implementing this feature client 
> side. 
> [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746]
> This Jira is to track that discussion and implementation once the necessary 
> protocol support exists: KAFKA-2229



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-2953) Kafka documentation is really wide

2021-07-13 Thread Grant Henke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2953:
--

Assignee: (was: Grant Henke)

> Kafka documentation is really wide
> --
>
> Key: KAFKA-2953
> URL: https://issues.apache.org/jira/browse/KAFKA-2953
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Google Chrome Version 47.0.2526.73 (64-bit)
>Reporter: Jens Rantil
>Priority: Trivial
>
> The page at http://kafka.apache.org/documentation.html is extremelly wide 
> which is mostly annoying.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #11009: MINOR: update doc for default assignor change

2021-07-13 Thread GitBox


ableegoldman merged pull request #11009:
URL: https://github.com/apache/kafka/pull/11009


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-13 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669069127



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 int newEpoch = partitionMetadata.leaderEpoch.get();
 // If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+if (currentEpoch == null || newEpoch >= currentEpoch) {
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
   Hmm I don't actually have access to both the topic IDs in this method. I 
can just do the old one and/or pass in the parameter for the new one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >