[GitHub] [kafka] kkonstantine commented on pull request #9431: KAFKA-10426: Deadlock on session key update.

2020-10-21 Thread GitBox


kkonstantine commented on pull request #9431:
URL: https://github.com/apache/kafka/pull/9431#issuecomment-714248893


   Thanks for checking @xakassi 
   Merged all the way to 2.4. I updated the fix versions and closed the jira. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-10-21 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10426:
---
Fix Version/s: 2.6.1
   2.5.2
   2.7.0
   2.4.2

> Deadlock in KafkaConfigBackingStore
> ---
>
> Key: KAFKA-10426
> URL: https://issues.apache.org/jira/browse/KAFKA-10426
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1, 2.6.0
>Reporter: Goltseva Taisiia
>Assignee: Goltseva Taisiia
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 2.4.2, 2.7.0, 2.5.2, 2.6.1
>
>
> Hi, guys!
> We faced the following deadlock:
>  
> {code:java}
> KafkaBasedLog Work Thread - _streaming_service_config
> priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
> (decimal):2384 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
> - waiting to lock <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
> - locked <0xd8c3be40> (a java.lang.Object)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
> at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
> CustomDistributedHerder-connect-1
> priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
> (decimal):2362 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
> - waiting to lock <0xd8c3be40> (a java.lang.Object)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
> - locked <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> DistributedHerder went to updateConfigsWithIncrementalCooperative() 
> synchronized method and called configBackingStore.snapshot() which take a 
> lock on internal object in KafkaConfigBackingStore class.
>  
> Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
> block on internal object got SESSION_KEY record and called 
> updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
>  
> As I can see the problem is here:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]
>  
> As I understand this call should be performed outside synchronized block:
> {code:java}
> if (started)
>
> updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
>  
> I'm going to make a PR.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2020-10-21 Thread Mayur Patki (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218213#comment-17218213
 ] 

Mayur Patki edited comment on KAFKA-4669 at 10/22/20, 5:08 AM:
---

Hi Team, [~rsivaram] , 

Encountering this issue on consumers as well - is there a workaround for this? 

spring-kafka consumer used

 

spring-kafka version - 2.2.14.RELEASE
 kafka client version - kafka-client - 2.0.1

 

kafka cluster running on 1.1.1
 Consumer exception - cause: {} - java.lang.IllegalStateException: Correlation 
id for response (400801) does not match request (400737), request header: 
RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-7, 
correlationId=400737)
 at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:853)
 at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:638)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:757)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:519)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
 at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:742)
 at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:748)


was (Author: mayur_patki):
Hi Team, 

Encountering this issue on consumers as well - is there a workaround for this? 

spring-kafka consumer used

 

spring-kafka version - 2.2.14.RELEASE
kafka client version - kafka-client - 2.0.1

 

kafka cluster running on 1.1.1
Consumer exception - cause: {} - java.lang.IllegalStateException: Correlation 
id for response (400801) does not match request (400737), request header: 
RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-7, 
correlationId=400737)
at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:853)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:638)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:757)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:519)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:742)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR 

[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei commented on pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#issuecomment-714211840


   Just fixed the build; re-running the tests now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei commented on pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#issuecomment-714211742


   Thanks, @mjsax !
   
   I agree with creating a ticket for the follow-on work; I'll do that tomorrow.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei commented on a change in pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#discussion_r509869342



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -73,7 +73,6 @@ public void setup() {
 // setup test driver
 final Properties props = new Properties();
 props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"maxAggregation");

Review comment:
   Yeah, you're looking at a state where I had partially applied the 
suggestion to drop the unnecessary configs. Bootstrap Server is always 
unnecessary with TTD, but applicationId sometimes winds up being necessary to 
keep tests from colliding on the state directory. Chasing these down is what 
got me hung up the first time around, so I think I'll just leave it as-is for 
now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10603) Re-design KStream.process() and K*.transform*() operations

2020-10-21 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218723#comment-17218723
 ] 

Sagar Rao commented on KAFKA-10603:
---

Hey [~vvcephei], I know this task requires a understanding of the new changes 
in the KIP.. Just curious, is it something that I can pick up?

> Re-design KStream.process() and K*.transform*() operations
> --
>
> Key: KAFKA-10603
> URL: https://issues.apache.org/jira/browse/KAFKA-10603
> Project: Kafka
>  Issue Type: New Feature
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> After the implementation of KIP-478, we have the ability to reconsider all 
> these APIs, and maybe just replace them with
> {code:java}
> // KStream
> KStream process(ProcessorSupplier) 
> // KTable
> KTable process(ProcessorSupplier){code}
>  
> but it needs more thought and a KIP for sure.
>  
> This ticket probably supercedes 
> https://issues.apache.org/jira/browse/KAFKA-8396



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509857105



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
 public void testFollowerGracefulShutdown() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+});
+})
+.build(voters);
 
-client.poll();
+assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());
+
+context.client.poll();
 
 int shutdownTimeoutMs = 5000;
-CompletableFuture shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
-assertTrue(client.isRunning());
+CompletableFuture shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
+assertTrue(context.client.isRunning());
 assertFalse(shutdownFuture.isDone());
 
-client.poll();
-assertFalse(client.isRunning());
+context.client.poll();
+assertFalse(context.client.isRunning());
 assertTrue(shutdownFuture.isDone());
 assertNull(shutdownFuture.get());
 }
 
 @Test
 public void testGracefulShutdownSingleMemberQuorum() throws IOException {
-KafkaRaftClient client = buildClient(Collections.singleton(localId));
+RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+
 assertEquals(ElectionState.withElectedLeader(
-1, localId, Collections.singleton(localId)), 
quorumStateStore.readElectionState());
-client.poll();
-assertEquals(0, channel.drainSendQueue().size());
+1, LOCAL_ID, Collections.singleton(LOCAL_ID)), 
context.quorumStateStore.readElectionState());
+context.client.poll();
+assertEquals(0, context.channel.drainSendQueue().size());
 int shutdownTimeoutMs = 5000;
-client.shutdown(shutdownTimeoutMs);
-assertTrue(client.isRunning());
-client.poll();
-assertFalse(client.isRunning());
+context.client.shutdown(shutdownTimeoutMs);
+assertTrue(context.client.isRunning());
+context.client.poll();
+assertFalse(context.client.isRunning());
 }
 
 @Test
 public void testFollowerReplication() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+});
+})
+.build(voters);
+
+assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());
 
-pollUntilSend(client);
+context.pollUntilSend();
 
-int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0);
+int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 
0L, 0);
 Records records = MemoryRecords.withRecords(0L, CompressionType.NONE,
 3, new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes()));
 FetchResponseData response = fetchResponse(epoch, otherNodeId, 
records, 0L, Errors.NONE);
-deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
+context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, 
response);
 
-client.poll();
-assertEquals(2L, log.endOffset().offset);
-assertEquals(2L, log.lastFlushedOffset());
+context.client.poll();
+assertEquals(2L, 

[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509857027



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
 public void testFollowerGracefulShutdown() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {

Review comment:
   Thanks for the suggestion. I implemented this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-21 Thread GitBox


feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714183165


   @vvcephei Thanks for the help, fyi, I also tried to build locally with the 
apache/trunk merged, it succeeded.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8124) Beginning offset is after the ending offset for topic partition

2020-10-21 Thread roamer_wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218694#comment-17218694
 ] 

roamer_wu commented on KAFKA-8124:
--

+1  spark2.4 with kafka2.1.0

> Beginning offset is after the ending offset for topic partition
> ---
>
> Key: KAFKA-8124
> URL: https://issues.apache.org/jira/browse/KAFKA-8124
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: OS : Rhel 7
> server : VM
>Reporter: suseendramani
>Priority: Major
>
>  
> We are getting this issue in production and Sparks consumer dying because of 
> Off Set issue.
> We observed the following error in Kafka Broker ( that has problems)
> --
> [2019-03-18 14:40:14,100] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1692e9ff4410004 has expired (org.apache.zookeeper.ClientCnxn)
>  [2019-03-18 14:40:14,100] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1692e9ff4410004 has expired, closing socket connection 
> (org.apache.zook
>  eeper.ClientCnxn)
> ---
> Error from other broker when talking to the problematic broker.
>  [2019-03-18 14:40:14,107] INFO [ReplicaFetcher replicaId=3, leaderId=5, 
> fetcherId=0] Error sending fetch request (sessionId=2127346653, 
> epoch=27048427) to
>  node 5: java.nio.channels.ClosedSelectorException. 
> (org.apache.kafka.clients.FetchSessionHandler)
>  
> 
>  
> All topics were having replication factor of 3 and this issue happens when 
> one of the broker was having issues. We are using SCRAM authentication 
> (SHA-256) and SSL.
>  
> Sparks Job died with the following error:
> ERROR 2019-03-18 07:40:57,178 7924 org.apache.spark.executor.Executor 
> [Executor task launch worker for task 16] Exception in task 27.0 in stage 0.0 
> (TID 16)
>  java.lang.AssertionError: assertion failed: Beginning offset 115204574 is 
> after the ending offset 115204516 for topic  partition 37. You 
> either provided an invalid fromOffset, or the Kafka topic has been damaged
>  at scala.Predef$.assert(Predef.scala:170)
>  at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:175)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  
> ---
>  
> please let me know if you need more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509840834



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-private static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
-
-private final int localId = 0;
-private final int electionTimeoutMs = 1;
-private final int electionBackoffMaxMs = 100;
-private final int fetchTimeoutMs = 5;   // fetch timeout is usually 
larger than election timeout
-private final int retryBackoffMs = 50;
-private final int requestTimeoutMs = 5000;
-private final int fetchMaxWaitMs = 0;
-
-private final MockTime time = new MockTime();
-private final MockLog log = new MockLog(METADATA_PARTITION);
-private final MockNetworkChannel channel = new MockNetworkChannel();
-private final Random random = Mockito.spy(new Random(1));
-private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
-
-@AfterEach
-public void cleanUp() throws IOException {
-quorumStateStore.clear();
-}
-
-private InetSocketAddress mockAddress(int id) {
-return new InetSocketAddress("localhost", 9990 + id);
-}
-
-private KafkaRaftClient buildClient(Set voters) throws 
IOException {
-return buildClient(voters, new Metrics(time));
-}
-
-private KafkaRaftClient buildClient(Set voters, Metrics metrics) 
throws IOException {
-LogContext logContext = new LogContext();
-QuorumState quorum = new QuorumState(localId, voters, 
electionTimeoutMs, fetchTimeoutMs,
-quorumStateStore, time, logContext, random);
-
-Map voterAddresses = 
voters.stream().collect(Collectors.toMap(
-Function.identity(),
-this::mockAddress
-));
-
-KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, 
time, metrics,
-new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), 
voterAddresses,
-electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, 
fetchMaxWaitMs, logContext, random);
-
-client.initialize();
-
-return client;
-}
-
 @Test
 public void testInitializeSingleMemberQuorum() throws IOException {
-buildClient(Collections.singleton(localId));
-assertEquals(ElectionState.withElectedLeader(1, localId, 
Collections.singleton(localId)),
-quorumStateStore.readElectionState());
+RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+assertEquals(
+ElectionState.withElectedLeader(1, LOCAL_ID, 
Collections.singleton(LOCAL_ID)),
+context.quorumStateStore.readElectionState()
+);
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() 
throws Exception {
 // Start off as leader. We should still bump the epoch after 
initialization
 
 int initialEpoch = 2;
-Set voters = Collections.singleton(localId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch,
 localId, voters));
-
-KafkaRaftClient client = buildClient(voters);
-assertEquals(1L, log.endOffset().offset);
-assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch 
+ 1),
-client.currentLeaderAndEpoch());
-assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
localId, voters),
-quorumStateStore.readElectionState());
+Set voters = Collections.singleton(LOCAL_ID);
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+quorumStateStore.writeElectionState(
+ElectionState.withElectedLeader(initialEpoch, 
LOCAL_ID, voters)
+);
+});
+})
+.build(voters);
+
+assertEquals(1L, context.log.endOffset().offset);
+assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch 
+ 1),
+context.client.currentLeaderAndEpoch());
+assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
LOCAL_ID, voters),
+context.quorumStateStore.readElectionState());
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStore() throws Exception {
-Set voters = Utils.mkSet(localId, 1);
+Set voters = Utils.mkSet(LOCAL_ID, 1);
 int epoch = 2;
 
-Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-

[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509840265



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-private static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
-
-private final int localId = 0;
-private final int electionTimeoutMs = 1;
-private final int electionBackoffMaxMs = 100;
-private final int fetchTimeoutMs = 5;   // fetch timeout is usually 
larger than election timeout
-private final int retryBackoffMs = 50;
-private final int requestTimeoutMs = 5000;
-private final int fetchMaxWaitMs = 0;
-
-private final MockTime time = new MockTime();
-private final MockLog log = new MockLog(METADATA_PARTITION);
-private final MockNetworkChannel channel = new MockNetworkChannel();
-private final Random random = Mockito.spy(new Random(1));
-private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
-
-@AfterEach
-public void cleanUp() throws IOException {
-quorumStateStore.clear();
-}
-
-private InetSocketAddress mockAddress(int id) {
-return new InetSocketAddress("localhost", 9990 + id);
-}
-
-private KafkaRaftClient buildClient(Set voters) throws 
IOException {
-return buildClient(voters, new Metrics(time));
-}
-
-private KafkaRaftClient buildClient(Set voters, Metrics metrics) 
throws IOException {
-LogContext logContext = new LogContext();
-QuorumState quorum = new QuorumState(localId, voters, 
electionTimeoutMs, fetchTimeoutMs,
-quorumStateStore, time, logContext, random);
-
-Map voterAddresses = 
voters.stream().collect(Collectors.toMap(
-Function.identity(),
-this::mockAddress
-));
-
-KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, 
time, metrics,
-new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), 
voterAddresses,
-electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, 
fetchMaxWaitMs, logContext, random);
-
-client.initialize();
-
-return client;
-}
-
 @Test
 public void testInitializeSingleMemberQuorum() throws IOException {
-buildClient(Collections.singleton(localId));
-assertEquals(ElectionState.withElectedLeader(1, localId, 
Collections.singleton(localId)),
-quorumStateStore.readElectionState());
+RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+assertEquals(
+ElectionState.withElectedLeader(1, LOCAL_ID, 
Collections.singleton(LOCAL_ID)),
+context.quorumStateStore.readElectionState()
+);
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() 
throws Exception {
 // Start off as leader. We should still bump the epoch after 
initialization
 
 int initialEpoch = 2;
-Set voters = Collections.singleton(localId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch,
 localId, voters));
-
-KafkaRaftClient client = buildClient(voters);
-assertEquals(1L, log.endOffset().offset);
-assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch 
+ 1),
-client.currentLeaderAndEpoch());
-assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
localId, voters),
-quorumStateStore.readElectionState());
+Set voters = Collections.singleton(LOCAL_ID);
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+quorumStateStore.writeElectionState(
+ElectionState.withElectedLeader(initialEpoch, 
LOCAL_ID, voters)
+);
+});
+})
+.build(voters);
+
+assertEquals(1L, context.log.endOffset().offset);
+assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch 
+ 1),
+context.client.currentLeaderAndEpoch());
+assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
LOCAL_ID, voters),
+context.quorumStateStore.readElectionState());
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStore() throws Exception {
-Set voters = Utils.mkSet(localId, 1);
+Set voters = Utils.mkSet(LOCAL_ID, 1);
 int epoch = 2;
 
-Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-

[jira] [Assigned] (KAFKA-10053) Update Document for new feature that Allow HTTP Response Headers to be Configured for Kafka Connect

2020-10-21 Thread Joel Hamill (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Hamill reassigned KAFKA-10053:
---

Assignee: Joel Hamill

> Update Document for new feature that Allow HTTP Response Headers to be 
> Configured for Kafka Connect
> ---
>
> Key: KAFKA-10053
> URL: https://issues.apache.org/jira/browse/KAFKA-10053
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeff Huang
>Assignee: Joel Hamill
>Priority: Major
>
> We need update AK document for this new feature whose details is in following 
> KIP: here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10053) Update Document for new feature that Allow HTTP Response Headers to be Configured for Kafka Connect

2020-10-21 Thread Joel Hamill (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Hamill reassigned KAFKA-10053:
---

Assignee: (was: Joel Hamill)

> Update Document for new feature that Allow HTTP Response Headers to be 
> Configured for Kafka Connect
> ---
>
> Key: KAFKA-10053
> URL: https://issues.apache.org/jira/browse/KAFKA-10053
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeff Huang
>Priority: Major
>
> We need update AK document for this new feature whose details is in following 
> KIP: here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509787211



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1536,67 +1522,70 @@ public void 
testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
 int otherNodeId = 2;
 int epoch = 5;
 Set voters = Utils.mkSet(leaderId, otherNodeId);
-KafkaRaftClient client = buildClient(voters);
-discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-pollUntilSend(client);
-RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+context.pollUntilSend();
+RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
 assertEquals(leaderId, fetchRequest1.destinationId());
-assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 
0);
 
-time.sleep(requestTimeoutMs);
-pollUntilSend(client);
+context.time.sleep(REQUEST_TIMEOUT_MS);
+context.pollUntilSend();
 
 // We should retry the Fetch against the other voter since the original
 // voter connection will be backing off.
-RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
 assertNotEquals(leaderId, fetchRequest2.destinationId());
 assertTrue(voters.contains(fetchRequest2.destinationId()));
-assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 
0);
 
-deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
+context.deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
 fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH));
-client.poll();
+context.client.poll();
 
-assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
quorumStateStore.readElectionState());
+assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
context.quorumStateStore.readElectionState());
 }
 
 @Test
 public void testLeaderGracefulShutdown() throws Exception {
 int otherNodeId = 1;
-Set voters = Utils.mkSet(localId, otherNodeId);
 int epoch = 1;
-KafkaRaftClient client = initializeAsLeader(voters, epoch);
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(voters, epoch);
 
 // Now shutdown
 int shutdownTimeoutMs = 5000;
-CompletableFuture shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
+CompletableFuture shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
 
 // We should still be running until we have had a chance to send 
EndQuorumEpoch
-assertTrue(client.isShuttingDown());
-assertTrue(client.isRunning());
+assertTrue(context.client.isShuttingDown());

Review comment:
   Yeah. Let me play around with this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


mjsax commented on pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#issuecomment-713939417


   Checkstyle failed:
   ```
   [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-9477@2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java:44:8:
 Unused import - java.util.Properties. [UnusedImports]
   ```
   
   No objections to merge as-is -- for the KIP, it's too late anyway for 2.7. 
Let us just create a ticket -- seems like a new "newbie" task to add the new 
constructor and finish the cleanup of our tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


mjsax commented on a change in pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#discussion_r509783380



##
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##
@@ -340,7 +340,6 @@ public void shouldThrowOnUnassignedStateStoreAccess() {
 final String badNodeName = "badGuy";
 
 final Properties config = new Properties();
-config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
 config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

Review comment:
   can be removed? (seems on many other tests, too)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


mjsax commented on a change in pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#discussion_r509783306



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -73,7 +73,6 @@ public void setup() {
 // setup test driver
 final Properties props = new Properties();
 props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"maxAggregation");

Review comment:
   can be removed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


mjsax commented on a change in pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#discussion_r509783380



##
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##
@@ -340,7 +340,6 @@ public void shouldThrowOnUnassignedStateStoreAccess() {
 final String badNodeName = "badGuy";
 
 final Properties config = new Properties();
-config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
 config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

Review comment:
   can be removed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509760723



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
 public void testFollowerGracefulShutdown() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {

Review comment:
   I think nearly every call to `updateQuorumStateStore` is just writing an 
initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the 
initial state and through `build` below. Since we always need `voters`, maybe 
we can provide it in the builder constructor. That would allow us to add 
helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
 .initializeAsFollower(epoch, otherNodeId)
 .build()
   ```
   
   Similarly, we could probably do state assertions in the test context as well 
and save the need to always pass through `voters` (e.g. we could have 
`context.assertFollower(epoch, leaderId)` instead of the cumbersome 
`assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), 
context.quorumStateStore.readElectionState())`).

##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
 public void testFollowerGracefulShutdown() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {

Review comment:
   I think just about every call to `updateQuorumStateStore` is just 
writing an initial state. Seems like we can introduce a more direct option to 
the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the 
initial state and through `build` below. Since we always need `voters`, maybe 
we can provide it in the builder constructor. That would allow us to add 
helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
 .initializeAsFollower(epoch, otherNodeId)
 .build()
   ```

##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
 public void testFollowerGracefulShutdown() throws Exception {
 int otherNodeId = 1;
 int epoch = 5;
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-Set voters = Utils.mkSet(localId, otherNodeId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-KafkaRaftClient client = buildClient(voters);
-assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+});
+})
+.build(voters);
 
-client.poll();
+assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());
+
+context.client.poll();
 
 int shutdownTimeoutMs = 5000;
-CompletableFuture shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
-assertTrue(client.isRunning());
+CompletableFuture shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
+assertTrue(context.client.isRunning());

[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei commented on pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#issuecomment-713906551


   Hey @mjsax and @abbccdda ,
   
   Sorry for this, but I've had to open a new PR instead of just updating 
https://github.com/apache/kafka/pull/9052 in place.
   
   I just took the current state of that other PR and rebased it on trunk (and 
resolved the conflicts). I'd like to get this quality-of-life improvement into 
2.7, but I didn't have time to take care of your suggestions for follow-on 
work. For clarity, I have migrated some, but not all, internal usages of 
TopologyTestDriver not to need the appId/bootstrap. I also didn't do a 
follow-on kip to offer new constructors that don't take `Properties` at all.
   
   Are you still ok with merging this as-is?
   
   (cc @bbejeck )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei closed pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei closed pull request #9052:
URL: https://github.com/apache/kafka/pull/9052


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei commented on pull request #9052:
URL: https://github.com/apache/kafka/pull/9052#issuecomment-713905061


   Closing in favor of https://github.com/apache/kafka/pull/9477, since I've 
recycled my fork since opening this PR and can no longer update the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-21 Thread GitBox


vvcephei opened a new pull request #9477:
URL: https://github.com/apache/kafka/pull/9477


   TopologyTestDriver comes with a paper cut that it passes through a
   config requirement that application.id and bootstrap.servers must be
   configured. But these configs are not required in the context of
   TopologyTestDriver specifically. This change relaxes the requirement.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509740150



##
File path: core/src/main/scala/kafka/common/RecordValidationException.scala
##
@@ -23,5 +23,6 @@ import 
org.apache.kafka.common.requests.ProduceResponse.RecordError
 import scala.collection.Seq
 
 class RecordValidationException(val invalidException: ApiException,
-val recordErrors: Seq[RecordError]) extends 
RuntimeException {
+val recordErrors: Seq[RecordError])
+  extends RuntimeException(invalidException) {

Review comment:
   Yeah, doesn't need to be done here. I just noticed the trace was missing 
when debugging a failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-21 Thread GitBox


jsancio opened a new pull request #9476:
URL: https://github.com/apache/kafka/pull/9476


   There is a lot of functionality in KafkaRaftClientTest that is useful for 
writing other tests. Refactor that functionality into another class that can be 
reused in other tests.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509732236



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -216,11 +216,9 @@ class KafkaNetworkChannel(time: Time,
 endpoints.put(id, node)
   }
 
-  def postInboundRequest(header: RequestHeader,
- request: AbstractRequest,
- onResponseReceived: ResponseHandler): Unit = {
+  def postInboundRequest(request: AbstractRequest, onResponseReceived: 
ResponseHandler): Unit = {

Review comment:
   This was actually a bug. The implementation was treating `correlationId` 
as unique across all connections, which of course was wrong. My fix was just to 
overwrite the `correlationId` from the header with one that could be unique, 
but obviously this loses traceability through the Raft layer. If it's ok with 
you, I'd like to address this problem more generally in a follow-up. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509726250



##
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataOutputStreamWritable;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BatchBuilder {

Review comment:
   I am not sure I follow. Are you suggesting making `BatchBuilder` a 
nested class? I think what it's doing is complex enough that I wanted a 
separate class that could be tested in isolation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509723127



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * TODO: Also flush after minimum size limit is reached?
+ */
+public class BatchAccumulator implements Closeable {
+private final int epoch;
+private final Time time;
+private final Timer lingerTimer;
+private final int lingerMs;
+private final int maxBatchSize;
+private final CompressionType compressionType;
+private final MemoryPool memoryPool;
+private final ReentrantLock lock;
+private final RecordSerde serde;
+
+private long nextOffset;
+private BatchBuilder currentBatch;
+private List> completed;
+
+public BatchAccumulator(
+int epoch,
+long baseOffset,
+int lingerMs,
+int maxBatchSize,
+MemoryPool memoryPool,
+Time time,
+CompressionType compressionType,
+RecordSerde serde
+) {
+this.epoch = epoch;
+this.lingerMs = lingerMs;
+this.maxBatchSize = maxBatchSize;
+this.memoryPool = memoryPool;
+this.time = time;
+this.lingerTimer = time.timer(lingerMs);
+this.compressionType = compressionType;
+this.serde = serde;
+this.nextOffset = baseOffset;
+this.completed = new ArrayList<>();
+this.lock = new ReentrantLock();
+}
+
+/**
+ * Append a list of records into an atomic batch. We guarantee all records
+ * are included in the same underlying record batch so that either all of
+ * the records become committed or none of them do.
+ *
+ * @param epoch the expected leader epoch
+ * @param records the list of records to include in a batch
+ * @return the offset of the last message or {@link Long#MAX_VALUE} if the 
epoch
+ * does not match
+ */
+public Long append(int epoch, List records) {
+if (epoch != this.epoch) {
+// If the epoch does not match, then the state machine probably
+// has not gotten the notification about the latest epoch change.
+// In this case, ignore the append and return a large offset value
+// which will never be committed.
+return Long.MAX_VALUE;
+}
+
+Object serdeContext = serde.newWriteContext();
+int batchSize = 0;
+for (T record : records) {
+batchSize += serde.recordSize(record, serdeContext);
+}
+
+if (batchSize > maxBatchSize) {
+throw new IllegalArgumentException("The total size of " + records 
+ " is " + batchSize +
+", which exceeds the maximum allowed batch size of " + 
maxBatchSize);
+}
+
+lock.lock();
+try {
+BatchBuilder batch = maybeAllocateBatch(batchSize);
+if (batch == null) {
+return null;
+}
+
+if (isEmpty()) {
+lingerTimer.update();
+lingerTimer.reset(lingerMs);
+}
+
+for (T record : records) {
+batch.appendRecord(record, serdeContext);
+nextOffset += 1;
+}
+
+return nextOffset - 1;
+} finally {
+lock.unlock();
+}
+}
+
+private BatchBuilder maybeAllocateBatch(int batchSize) {
+if (currentBatch == null) {
+startNewBatch();
+} else if (!currentBatch.hasRoomFor(batchSize)) {
+

[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509719785



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1605,100 +1708,18 @@ public void poll() throws IOException {
 }
 }
 
-private void failPendingAppends(KafkaException exception) {
-for (UnwrittenAppend unwrittenAppend : unwrittenAppends) {
-unwrittenAppend.fail(exception);
-}
-unwrittenAppends.clear();
-}
-
-private void pollPendingAppends(LeaderState state, long currentTimeMs) {
-int numAppends = 0;
-int maxNumAppends = unwrittenAppends.size();
-
-while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) {
-final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll();
-
-if (unwrittenAppend.future.isDone())
-continue;
-
-if (unwrittenAppend.isTimedOut(currentTimeMs)) {
-unwrittenAppend.fail(new TimeoutException("Request timeout " + 
unwrittenAppend.requestTimeoutMs
-+ " expired before the records could be appended to the 
log"));
-} else {
-int epoch = quorum.epoch();
-LogAppendInfo info = appendAsLeader(unwrittenAppend.records);
-OffsetAndEpoch offsetAndEpoch = new 
OffsetAndEpoch(info.lastOffset, epoch);
-long numRecords = info.lastOffset - info.firstOffset + 1;
-logger.debug("Completed write of {} records at {}", 
numRecords, offsetAndEpoch);
-
-if (unwrittenAppend.ackMode == AckMode.LEADER) {
-unwrittenAppend.complete(offsetAndEpoch);
-} else if (unwrittenAppend.ackMode == AckMode.QUORUM) {
-CompletableFuture future = appendPurgatory.await(
-LogOffset.awaitCommitted(offsetAndEpoch.offset),
-unwrittenAppend.requestTimeoutMs);
-
-future.whenComplete((completionTimeMs, exception) -> {
-if (exception != null) {
-logger.error("Failed to commit append at {} due to 
{}", offsetAndEpoch, exception);
-
-unwrittenAppend.fail(exception);
-} else {
-long elapsedTime = Math.max(0, completionTimeMs - 
currentTimeMs);
-double elapsedTimePerRecord = (double) elapsedTime 
/ numRecords;
-
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs);
-unwrittenAppend.complete(offsetAndEpoch);
-
-logger.debug("Completed commit of {} records at 
{}", numRecords, offsetAndEpoch);
-}
-});
-}
-}
-
-numAppends++;
-}
-
-if (numAppends > 0) {
-flushLeaderLog(state, currentTimeMs);
-}
-}
-
-/**
- * Append a set of records to the log. Successful completion of the future 
indicates a success of
- * the append, with the uncommitted base offset and epoch.
- *
- * @param records The records to write to the log
- * @param ackMode The commit mode for the appended records
- * @param timeoutMs The maximum time to wait for the append operation to 
complete (including
- *  any time needed for replication)
- * @return The uncommitted base offset and epoch of the appended records
- */
 @Override
-public CompletableFuture append(
-Records records,
-AckMode ackMode,
-long timeoutMs
-) {
-if (records.sizeInBytes() == 0)
-throw new IllegalArgumentException("Attempt to append empty record 
set");
-
-if (shutdown.get() != null)
-throw new IllegalStateException("Cannot append records while we 
are shutting down");
-
-if (quorum.isObserver())
-throw new IllegalStateException("Illegal attempt to write to an 
observer");
-
-CompletableFuture future = new CompletableFuture<>();
-UnwrittenAppend unwrittenAppend = new UnwrittenAppend(
-records, time.milliseconds(), timeoutMs, ackMode, future);
+public Long scheduleAppend(int epoch, List records) {
+BatchAccumulator accumulator = this.accumulator;
+if (accumulator == null) {
+return Long.MAX_VALUE;

Review comment:
   Yeah, see my comment above about the handling of `Long.MAX_VALUE`. This 
is an attempt to reduce the error handling in the state machine. The model that 
we are working toward here is the following:
   
   1) the state machine gets notified that the node has become leader in some 
epoch
   2) the state machine can schedule appends with this epoch and it will get 
back the expected append offset
   3) the state machine 

[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509714949



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1443,15 +1485,79 @@ private void pollShutdown(GracefulShutdown shutdown) 
throws IOException {
 }
 }
 
+private void appendBatch(
+LeaderState state,
+BatchAccumulator.CompletedBatch batch,
+long appendTimeMs
+) {
+try {
+List records = batch.records;
+int epoch = state.epoch();
+
+LogAppendInfo info = appendAsLeader(batch.data);
+OffsetAndEpoch offsetAndEpoch = new 
OffsetAndEpoch(info.lastOffset, epoch);
+CompletableFuture future = appendPurgatory.await(
+LogOffset.awaitCommitted(offsetAndEpoch.offset),
+Integer.MAX_VALUE
+);
+
+future.whenComplete((commitTimeMs, exception) -> {
+int numRecords = batch.records.size();
+if (exception != null) {
+logger.debug("Failed to commit {} records at {}", 
numRecords, offsetAndEpoch, exception);
+} else {
+long elapsedTime = Math.max(0, commitTimeMs - 
appendTimeMs);
+double elapsedTimePerRecord = (double) elapsedTime / 
numRecords;
+kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, 
appendTimeMs);
+logger.debug("Completed commit of {} records at {}", 
numRecords, offsetAndEpoch);
+listener.handleCommit(epoch, info.lastOffset, records);
+}
+});
+} finally {
+batch.release();
+}
+}
+
+private long maybeAppendBatches(
+LeaderState state,
+long currentTimeMs
+) {
+long timeUnitFlush = accumulator.timeUntilFlush(currentTimeMs);
+if (timeUnitFlush <= 0) {
+List> batches = 
accumulator.flush();
+Iterator> iterator = 
batches.iterator();
+
+try {
+while (iterator.hasNext()) {
+BatchAccumulator.CompletedBatch batch = iterator.next();
+appendBatch(state, batch, currentTimeMs);
+}
+flushLeaderLog(state, currentTimeMs);

Review comment:
   Yes, I agree with you. Of course it is ok if unflushed data gets 
replicated. The main thing we need to protect is incrementing the high 
watermark.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509711302



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
 )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+client: KafkaRaftClient[Array[Byte]],
+time: Time,
+brokerId: Int,
+recordsPerSec: Int,
+recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+private val stats = new WriteStats(time, printIntervalMs = 5000)
+private val payload = new Array[Byte](recordSize)
+private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+private var isLeader = false
+private var throttler: ThroughputThrottler = _
+private var recordCount = 0
+
+override def doWork(): Unit = {
+  if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+if (isLeader) {
+  pendingAppends.clear()

Review comment:
   You are right that the appends may still be committed, but in this 
patch, the `handleCommit` API is only invoked for appends within the current 
epoch. I thought it seemed simpler for now to just reset state at the start of 
the epoch. We can be more clever in the future once `handleCommit` is extended 
to account for replication.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-21 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509670268



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
 info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+   handler: RequestChannel.Request => Unit): Unit = {
+if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+  sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+} else if (request.envelopeContext.isDefined &&
+  (!request.context.fromPrivilegedListener ||
+  !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+) {
+  // If the designated forwarding request is not coming from a privileged 
listener, or
+  // it fails CLUSTER_ACTION permission, we would fail the authorization.
+  sendErrorResponseMaybeThrottle(request, 
Errors.BROKER_AUTHORIZATION_FAILURE.exception())
+} else if (request.envelopeContext.isDefined && !controller.isActive) {
+  sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+} else if (!controller.isActive && couldDoRedirection(request)) {
+  redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
+} else {
+  // When IBP is smaller than 2.8 or the principal serde is undefined, 
forwarding is not supported,
+  // therefore requests are handled directly.
+  handler(request)
+}
+  }
+
+  private def couldDoRedirection(request: RequestChannel.Request): Boolean =

Review comment:
   Sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-21 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509645115



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
 val context = new RequestContext(header, connectionId, 
channel.socketAddress,
   channel.principal, listenerName, securityProtocol,
   channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener)
-val req = new RequestChannel.Request(processor = id, context = 
context,
-  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics)
+
+val principalSerde = 
Option(channel.principalSerde.orElse(null))

Review comment:
   Had a try but it seems java Optional doesn't have an `asScala` option





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan removed a comment on pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-21 Thread GitBox


jolshan removed a comment on pull request #9472:
URL: https://github.com/apache/kafka/pull/9472#issuecomment-713842825


   Woo hoo! Tests all pass. LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-21 Thread GitBox


jolshan commented on pull request #9472:
URL: https://github.com/apache/kafka/pull/9472#issuecomment-713842825


   Woo hoo! Tests all pass. LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509636357



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -19,37 +19,44 @@ package kafka.tools
 
 import java.io.File
 import java.nio.file.Files
-import java.util.concurrent.CountDownLatch
-import java.util.{Properties, Random}
+import java.util
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Collections, OptionalInt, Random}
 
-import joptsimple.OptionParser
+import com.yammer.metrics.core.MetricName
+import joptsimple.OptionException
 import kafka.log.{Log, LogConfig, LogManager}
 import kafka.network.SocketServer
 import kafka.raft.{KafkaFuturePurgatory, KafkaMetadataLog, KafkaNetworkChannel}
 import kafka.security.CredentialProvider
 import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaRequestHandlerPool, 
KafkaServer, LogDirFailureChannel}
+import kafka.tools.TestRaftServer.{ByteArraySerde, PendingAppend, 
ThroughputThrottler, WriteStats}

Review comment:
   Moved the import inside `TestRaftServer`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509633527



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
 )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+client: KafkaRaftClient[Array[Byte]],
+time: Time,
+brokerId: Int,
+recordsPerSec: Int,
+recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+private val stats = new WriteStats(time, printIntervalMs = 5000)
+private val payload = new Array[Byte](recordSize)
+private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+private var isLeader = false
+private var throttler: ThroughputThrottler = _
+private var recordCount = 0
+
+override def doWork(): Unit = {
+  if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+if (isLeader) {
+  pendingAppends.clear()
+  throttler = new ThroughputThrottler(time, recordsPerSec)
+  recordCount = 0
+}
+  }
+
+  if (isLeader) {
+recordCount += 1
+
+val startTimeMs = time.milliseconds()
+val sendTimeMs = if (throttler.maybeThrottle(recordCount, 
startTimeMs)) {
+  time.milliseconds()
+} else {
+  startTimeMs
+}
+
+val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, 
Collections.singletonList(payload))
+if (offset == null || offset == Long.MaxValue) {
+  time.sleep(10)
+} else {
+  pendingAppends.offer(PendingAppend(latestLeaderAndEpoch.epoch, 
offset, sendTimeMs))
+}
+  } else {
+time.sleep(500)
+  }
+}
+
+override def handleCommit(epoch: Int, lastOffset: Long, records: 
util.List[Array[Byte]]): Unit = {
+  var offset = lastOffset - records.size() + 1
+  val currentTimeMs = time.milliseconds()
+
+  for (record <- records.asScala) {
+val pendingAppend = pendingAppends.poll()
+if (pendingAppend.epoch != epoch || pendingAppend.offset!= offset) {
+  warn(s"Expected next commit at offset ${pendingAppend.offset}, " +

Review comment:
   That's true. In a follow-up, the `handleCommit` API will be expanded a 
bit to cover appends through replication as well, but for now, I think we can 
raise an error.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509620795



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
 )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+client: KafkaRaftClient[Array[Byte]],
+time: Time,
+brokerId: Int,
+recordsPerSec: Int,
+recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+private val stats = new WriteStats(time, printIntervalMs = 5000)
+private val payload = new Array[Byte](recordSize)
+private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+private var isLeader = false
+private var throttler: ThroughputThrottler = _
+private var recordCount = 0
+
+override def doWork(): Unit = {
+  if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+if (isLeader) {
+  pendingAppends.clear()
+  throttler = new ThroughputThrottler(time, recordsPerSec)
+  recordCount = 0
+}
+  }
+
+  if (isLeader) {
+recordCount += 1
+
+val startTimeMs = time.milliseconds()
+val sendTimeMs = if (throttler.maybeThrottle(recordCount, 
startTimeMs)) {
+  time.milliseconds()
+} else {
+  startTimeMs
+}
+
+val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, 
Collections.singletonList(payload))
+if (offset == null || offset == Long.MaxValue) {

Review comment:
   I will try to document this better, but `Long.MaxValue` is how we 
decided to handle the case where the epoch in `scheduleAppend` does not match 
the current epoch. This can happen because the raft epoch is updated 
asynchronously and there is no way to ensure the state machine has seen the 
latest value. The expectation is that the state machine will update its 
uncommitted state with an offset which is impossible to become committed. After 
it observes the epoch change, this uncommitted state will be discarded.
   
   Note that although I added the explicit check here, it is not technically 
necessary. Let me consider removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509615681



##
File path: core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
##
@@ -56,73 +47,8 @@ class TestRaftRequestHandler(
  | ApiKeys.END_QUORUM_EPOCH
  | ApiKeys.FETCH =>
   val requestBody = request.body[AbstractRequest]
-  networkChannel.postInboundRequest(
-request.header,
-requestBody,
-response => sendResponse(request, Some(response)))
-
-case ApiKeys.API_VERSIONS =>

Review comment:
   I felt it was too difficult to approximate a controller workload using 
one or more producers because of the fact that we can only handle one request 
at a time. So I created a separate workload generator which executes on the 
leader and I removed all of this somewhat hacky handling logic which allowed us 
to use a producer.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509603110



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataOutputWritable.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataOutputWritable implements Writable {

Review comment:
   Fair enough. We have no current need for `DataOutputWritable`, so I will 
remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1

2020-10-21 Thread GitBox


hachikuji merged pull request #9406:
URL: https://github.com/apache/kafka/pull/9406


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1

2020-10-21 Thread GitBox


hachikuji commented on pull request #9406:
URL: https://github.com/apache/kafka/pull/9406#issuecomment-713807844


   Merging to trunk and 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10625) Add Union to Connect Schema

2020-10-21 Thread Ryan (Jira)
Ryan created KAFKA-10625:


 Summary: Add Union to Connect Schema
 Key: KAFKA-10625
 URL: https://issues.apache.org/jira/browse/KAFKA-10625
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Ryan


There currently is no Union type for the Kafka Connect Schema/SchemaBuilder.   
When using Kafka Connect to produce messages intended to be converted to AVRO a 
converter specific workaround must be employed to generate a union.

 

https://stackoverflow.com/questions/64468907/does-kafka-connect-support-unions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10139) [Easy] Add operational guide for failure recovery

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10139:
-
Summary: [Easy] Add operational guide for failure recovery  (was: Add 
operational guide for failure recovery)

> [Easy] Add operational guide for failure recovery
> -
>
> Key: KAFKA-10139
> URL: https://issues.apache.org/jira/browse/KAFKA-10139
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Boyang Chen
>Priority: Major
>
> In the first released version, we should include an operation manual to the 
> feature versioning failure cases, such as:
> 1. broker crash due to violation of feature versioning
> 2. ZK data corruption (rare)
> We need to ensure this work gets reflected in the AK documentation after the 
> implementation and thorough testings are done.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10622) [Medium] Implement support for feature version deprecation

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10622:
-
Summary: [Medium] Implement support for feature version deprecation  (was: 
Implement support for feature version deprecation)

> [Medium] Implement support for feature version deprecation
> --
>
> Key: KAFKA-10622
> URL: https://issues.apache.org/jira/browse/KAFKA-10622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> This Jira tracks the implementation of feature version deprecation support 
> for KIP-584.
> The feature version deprecation is future work 
> ([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]).
>  We didn’t find a need to implement it immediately as part of AK 2.7 release 
> for KIP-584. The reason is that we don’t have features defined yet as part of 
> AK 2.7 release and it’ll be a long time (years) before we start to deprecate 
> feature versions. So there is no immediate need to implement the support.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10621) [Easy] Implement advanced CLI tool for feature versioning system

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10621:
-
Summary: [Easy] Implement advanced CLI tool for feature versioning system  
(was: Implement advanced CLI tool for feature versioning system)

> [Easy] Implement advanced CLI tool for feature versioning system
> 
>
> Key: KAFKA-10621
> URL: https://issues.apache.org/jira/browse/KAFKA-10621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> Implement advanced CLI tool capabilities for the feature versioning system 
> providing the facilities as explained in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]
>  of KIP-584. The implementation needs to be done in 
> [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
>  class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10624:
-
Summary: [Easy] FeatureZNodeStatus should use sealed trait instead of 
Enumeration  (was: FeatureZNodeStatus should use sealed trait instead of 
Enumeration)

> [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
> 
>
> Key: KAFKA-10624
> URL: https://issues.apache.org/jira/browse/KAFKA-10624
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> In Scala, we prefer sealed traits over Enumeration since the former gives you 
> exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
> you add a new value that is not handled in a given pattern match.
> This Jira tracks refactoring enum 
> [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801]
>  from an enum to a sealed trait. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10623) [Easy] Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10623:
-
Summary: [Easy] Refactor code to avoid discovery conflicts for 
classes:{Supported|Finalized}VersionRange  (was: Refactor code to avoid 
discovery conflicts for classes:{Supported|Finalized}VersionRange)

> [Easy] Refactor code to avoid discovery conflicts for 
> classes:{Supported|Finalized}VersionRange
> ---
>
> Key: KAFKA-10623
> URL: https://issues.apache.org/jira/browse/KAFKA-10623
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> This Jira suggests changing few existing class names to avoid class discovery 
> conflicts. Particularly the following classes:
> {code:java}
> org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code}
> conflict with
>  
>  
> {code:java}
> org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code}
> The former is internal facing, while the latter is external facing (since it 
> is used in the Admin#describeFeatures API). So, the internal facing classes 
> can be renamed suitably. Possible alternative naming suggestions:
>  
>  
> {code:java}
> org.apache.kafka.clients.admin.{Supported|Finalized}Versions
> {code}
> {code:java}
> org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions
> {code}
> {code:java}
> org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10624) FeatureZNodeStatus should use sealed trait instead of Enumeration

2020-10-21 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-10624:


 Summary: FeatureZNodeStatus should use sealed trait instead of 
Enumeration
 Key: KAFKA-10624
 URL: https://issues.apache.org/jira/browse/KAFKA-10624
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


In Scala, we prefer sealed traits over Enumeration since the former gives you 
exhaustiveness checking. With Scala Enumeration, you don't get a warning if you 
add a new value that is not handled in a given pattern match.

This Jira tracks refactoring enum 
[FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801]
 from an enum to a sealed trait. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509540617



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
 val session = Session(context.principal, context.clientAddress)
+
 private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
 def header: RequestHeader = context.header
 def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-//most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-//some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-//to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+def buildResponse(abstractResponse: AbstractResponse,
+  error: Errors): Send =
+  if (envelopeContext.isDefined) {

Review comment:
   nit: use `match`

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
 val context = new RequestContext(header, connectionId, 
channel.socketAddress,
   channel.principal, listenerName, securityProtocol,
   channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener)
-val req = new RequestChannel.Request(processor = id, context = 
context,
-  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics)
+
+val principalSerde = 
Option(channel.principalSerde.orElse(null))
+val req =
+if (header.apiKey == ApiKeys.ENVELOPE) {

Review comment:
   nit: this is misaligned. It might be better to pull the body here into a 
separate method (e.g. `parseEnvelopeRequest`)

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
 info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+   handler: RequestChannel.Request => Unit): Unit = {
+if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+  sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+} else if (request.envelopeContext.isDefined &&
+  (!request.context.fromPrivilegedListener ||
+  !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+) {
+  // If the designated forwarding request is not coming from a privileged 
listener, or
+  // it fails CLUSTER_ACTION permission, we would fail the authorization.
+  sendErrorResponseMaybeThrottle(request, 
Errors.BROKER_AUTHORIZATION_FAILURE.exception())

Review comment:
   As mentioned above, you can see the rest of the cases in this class 
where we check CLUSTER_ACTION and they all return 
`CLUSTER_AUTHORIZATION_FAILURE`.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
 case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
 case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+case ApiKeys.CREATE_TOPICS => maybeForward(request, 
handleCreateTopicsRequest)

Review comment:
   We should have a check at the beginning of `handle` to restrict the 
"forwardable" APIs. 

##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
 val session = Session(context.principal, context.clientAddress)
+
 private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
 def header: RequestHeader = context.header
 def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-//most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-//some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-//to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+def buildResponse(abstractResponse: AbstractResponse,
+  error: Errors): Send =

Review comment:
   nit: add braces to all of these methods. Even though they are not 
required, braces make it easier to see the scope


[jira] [Created] (KAFKA-10623) Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange

2020-10-21 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-10623:


 Summary: Refactor code to avoid discovery conflicts for 
classes:{Supported|Finalized}VersionRange
 Key: KAFKA-10623
 URL: https://issues.apache.org/jira/browse/KAFKA-10623
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


This Jira suggests changing few existing class names to avoid class discovery 
conflicts. Particularly the following classes:
{code:java}
org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code}
conflict with

 

 
{code:java}
org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code}
The former is internal facing, while the latter is external facing (since it is 
used in the Admin#describeFeatures API). So, the internal facing classes can be 
renamed suitably. Possible alternative naming suggestions:

 

 
{code:java}
org.apache.kafka.clients.admin.{Supported|Finalized}Versions
{code}
{code:java}
org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions
{code}
{code:java}
org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-10-21 Thread GitBox


mjsax commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-713781998


   @soarez Sorry but you will need to rebase your PR to get 
https://github.com/apache/kafka/commit/2db67db8e1329cb2e047322cff81d97ff98b4328 
-- otherwise, Jenkins does fail...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10622) Implement support for feature version deprecation

2020-10-21 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-10622:


 Summary: Implement support for feature version deprecation
 Key: KAFKA-10622
 URL: https://issues.apache.org/jira/browse/KAFKA-10622
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


This Jira tracks the implementation of feature version deprecation support for 
KIP-584.

The feature version deprecation is future work 
([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]).
 We didn’t find a need to implement it immediately as part of AK 2.7 release 
for KIP-584. The reason is that we don’t have features defined yet as part of 
AK 2.7 release and it’ll be a long time (years) before we start to deprecate 
feature versions. So there is no immediate need to implement the support.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10621:
-
Description: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in [this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]
 of KIP-584. The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class.   (was: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in [[this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
 of KIP-584. The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class. )

> Implement advanced CLI tool for feature versioning system
> -
>
> Key: KAFKA-10621
> URL: https://issues.apache.org/jira/browse/KAFKA-10621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> Implement advanced CLI tool capabilities for the feature versioning system 
> providing the facilities as explained in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]
>  of KIP-584. The implementation needs to be done in 
> [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
>  class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10621:
-
Description: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in [this 
section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
 of KIP-584. The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class.   (was: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in this section of 
KIP-584: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]
 . The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class. )

> Implement advanced CLI tool for feature versioning system
> -
>
> Key: KAFKA-10621
> URL: https://issues.apache.org/jira/browse/KAFKA-10621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> Implement advanced CLI tool capabilities for the feature versioning system 
> providing the facilities as explained in [this 
> section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
>  of KIP-584. The implementation needs to be done in 
> [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
>  class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system

2020-10-21 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10621:
-
Description: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in [[this 
section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
 of KIP-584. The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class.   (was: Implement advanced CLI tool capabilities for the feature 
versioning system providing the facilities as explained in [this 
section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
 of KIP-584. The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class. )

> Implement advanced CLI tool for feature versioning system
> -
>
> Key: KAFKA-10621
> URL: https://issues.apache.org/jira/browse/KAFKA-10621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Priority: Minor
>
> Implement advanced CLI tool capabilities for the feature versioning system 
> providing the facilities as explained in [[this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]]
>  of KIP-584. The implementation needs to be done in 
> [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
>  class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10621) Implement advanced CLI tool for feature versioning system

2020-10-21 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-10621:


 Summary: Implement advanced CLI tool for feature versioning system
 Key: KAFKA-10621
 URL: https://issues.apache.org/jira/browse/KAFKA-10621
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


Implement advanced CLI tool capabilities for the feature versioning system 
providing the facilities as explained in this section of KIP-584: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]
 . The implementation needs to be done in 
[FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala]
 class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-21 Thread GitBox


mimaison commented on pull request #9468:
URL: https://github.com/apache/kafka/pull/9468#issuecomment-713772503


   @bbejeck Yes we can include this into 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-21 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713766035


   By the way, it was true that trunk was broken. Fixed by: 
https://github.com/apache/kafka/commit/2db67db8e1329cb2e047322cff81d97ff98b4328



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9475: MINOR: Add Jenkinsfile to 2.1

2020-10-21 Thread GitBox


vvcephei commented on pull request #9475:
URL: https://github.com/apache/kafka/pull/9475#issuecomment-713759548


   See also #9471 and #9472 and #9474 
   
   @ijuma @jolshan @mumrah , are any of you able to review this as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei edited a comment on pull request #9475: MINOR: Add Jenkinsfile to 2.1

2020-10-21 Thread GitBox


vvcephei edited a comment on pull request #9475:
URL: https://github.com/apache/kafka/pull/9475#issuecomment-713759548


   This is the last one.
   
   See also #9471 and #9472 and #9474 
   
   @ijuma @jolshan @mumrah , are any of you able to review this as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9475: MINOR: Add Jenkinsfile to 2.1

2020-10-21 Thread GitBox


vvcephei opened a new pull request #9475:
URL: https://github.com/apache/kafka/pull/9475


   Add a Jenkinsfile for the 2.1 branch so PRs can be built
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9474: MINOR: Add Jenkinsfile to 2.2

2020-10-21 Thread GitBox


vvcephei commented on pull request #9474:
URL: https://github.com/apache/kafka/pull/9474#issuecomment-713758455


   See also #9471 and #9472 
   
   @ijuma @jolshan @mumrah , are any of you able to review this as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-21 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218458#comment-17218458
 ] 

Bill Bejeck edited comment on KAFKA-9381 at 10/21/20, 5:59 PM:
---

Since this is a long-standing issue, I'm going to remove the blocker tag. I'm 
taking a look at getting this fixed in this release, so I have picked up the 
ticket.


was (Author: bbejeck):
Since this is a long-standing issue, I'm going to remove the blocker tag. 

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Critical
> Fix For: 2.8.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-21 Thread GitBox


jolshan commented on pull request #9472:
URL: https://github.com/apache/kafka/pull/9472#issuecomment-713754418


   The versions look correct to me. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9474: MINOR: Add Jenkinsfile to 2.2

2020-10-21 Thread GitBox


vvcephei opened a new pull request #9474:
URL: https://github.com/apache/kafka/pull/9474


   Add a Jenkinsfile for the 2.2 branch so PRs can be built
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-21 Thread GitBox


vvcephei commented on pull request #9472:
URL: https://github.com/apache/kafka/pull/9472#issuecomment-713749192


   See also #9471 
   
   @ijuma @jolshan @mumrah , are any of you able to review this as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
 val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+// Resetting `lastFetchedEpoch` since we are truncating and don't 
expect diverging epoch in the next fetch

Review comment:
   This is a little unclear to me. I guess it is safe to reset 
`lastFetchedEpoch` as long as we reinitialize it after the next leader change. 
On the other hand, it seems safer to always retain the value.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && 
initialFetchState.lastFetchedEpoch.nonEmpty) {
+  if (currentState == null) {
+return PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)
+  }
+  // If we are in `Fetching` state can continue to fetch regardless of 
current leader epoch and truncate
+  // if necessary based on diverging epochs returned by the leader. If we 
are currently in Truncating state,
+  // fall through and handle based on current epoch.
+  if (currentState.state == Fetching) {
+return currentState

Review comment:
   Is it not possible that the `InitialFetchState` has a bump to the 
current leader epoch? We will still need the latest epoch in order to continue 
fetching.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String,
   // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
   if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
 // Update partitionStates only if there is no 
exception during processPartitionData
-val newFetchState = PartitionFetchState(nextOffset, 
Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
+val newFetchState = PartitionFetchState(nextOffset, 
Some(lag),
+  currentFetchState.currentLeaderEpoch, state = 
Fetching,
+  Some(currentFetchState.currentLeaderEpoch))

Review comment:
   This doesn't seem right. The last fetched epoch is supposed to represent 
the epoch of the last fetched batch. The fetcher could be fetching the data 
from an older epoch here.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
 val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating

Review comment:
   Do we need to adjust this? I think we want to remain in the `Fetching` 
state if truncation detection is through `Fetch`.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String,
 
   val initialLag = leaderEndOffset - offsetToFetch
   fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
-  PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, 
state = Fetching)
+  // We don't expect diverging epochs from the next fetch request, so 
resetting `lastFetchedEpoch`

Review comment:
   Again it seems safe to keep `lastFetchedEpoch` in sync with the local 
log. If we have done a full truncation above, then `lastFetchedEpoch` will be 
`None`, but otherwise it seems like we should set it.

##
File path: 

[jira] [Commented] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-21 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218458#comment-17218458
 ] 

Bill Bejeck commented on KAFKA-9381:


Since this is a long-standing issue, I'm going to remove the blocker tag. 

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 2.7.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-21 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9381:
---
Priority: Critical  (was: Blocker)

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Critical
> Fix For: 2.7.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-21 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9381:
---
Fix Version/s: (was: 2.7.0)
   2.8.0

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Critical
> Fix For: 2.8.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-21 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-9381:
--

Assignee: Bill Bejeck  (was: Randall Hauch)

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 2.7.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1

2020-10-21 Thread GitBox


dajac commented on a change in pull request #9406:
URL: https://github.com/apache/kafka/pull/9406#discussion_r509492493



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -444,10 +444,25 @@ private boolean maybeSendAndPollTransactionalRequest() {
 AbstractRequest.Builder requestBuilder = 
nextRequestHandler.requestBuilder();
 Node targetNode = null;
 try {
-targetNode = awaitNodeReady(nextRequestHandler.coordinatorType());
-if (targetNode == null) {
+FindCoordinatorRequest.CoordinatorType coordinatorType = 
nextRequestHandler.coordinatorType();
+targetNode = coordinatorType != null ?
+transactionManager.coordinator(coordinatorType) :
+client.leastLoadedNode(time.milliseconds());
+if (targetNode != null) {
+if (!awaitNodeReady(targetNode, coordinatorType)) {
+log.trace("Target node {} not ready within request 
timeout, will retry when node is ready.", targetNode);
+maybeFindCoordinatorAndRetry(nextRequestHandler);
+return true;
+}
+} else if (coordinatorType != null) {
+log.trace("Coordinator not known for {}, will retry {} after 
finding coordinator.", coordinatorType, requestBuilder.apiKey());
 maybeFindCoordinatorAndRetry(nextRequestHandler);
 return true;
+} else {
+log.trace("No nodes available to send requests, will poll and 
retry when until a node is ready.");
+transactionManager.retry(nextRequestHandler);
+client.poll(retryBackoffMs, time.milliseconds());
+return true;

Review comment:
   @rajinisivaram Yeah, I do agree. Polling seems sufficient in this case. 
Thanks for the clarification. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9910) Implement new transaction timed out error

2020-10-21 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218450#comment-17218450
 ] 

Bill Bejeck commented on KAFKA-9910:


This is a sub-task for KIP-588, which is not going in 2.7, so I'm going to move 
the fix version to 2.8.0 as part of the 2.7.0 release process.

> Implement new transaction timed out error
> -
>
> Key: KAFKA-9910
> URL: https://issues.apache.org/jira/browse/KAFKA-9910
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9910) Implement new transaction timed out error

2020-10-21 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9910:
---
Fix Version/s: (was: 2.7.0)
   2.8.0

> Implement new transaction timed out error
> -
>
> Key: KAFKA-9910
> URL: https://issues.apache.org/jira/browse/KAFKA-9910
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts

2020-10-21 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9803:
---
Fix Version/s: (was: 2.7.0)
   2.8.0

> Allow producers to recover gracefully from transaction timeouts
> ---
>
> Key: KAFKA-9803
> URL: https://issues.apache.org/jira/browse/KAFKA-9803
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.8.0
>
>
> Transaction timeouts are detected by the transaction coordinator. When the 
> coordinator detects a timeout, it bumps the producer epoch and aborts the 
> transaction. The epoch bump is necessary in order to prevent the current 
> producer from being able to begin writing to a new transaction which was not 
> started through the coordinator.  
> Transactions may also be aborted if a new producer with the same 
> `transactional.id` starts up. Similarly this results in an epoch bump. 
> Currently the coordinator does not distinguish these two cases. Both will end 
> up as a `ProducerFencedException`, which means the producer needs to shut 
> itself down. 
> We can improve this with the new APIs from KIP-360. When the coordinator 
> times out a transaction, it can remember that fact and allow the existing 
> producer to claim the bumped epoch and continue. Roughly the logic would work 
> like this:
> 1. When a transaction times out, set lastProducerEpoch to the current epoch 
> and do the normal bump.
> 2. Any transactional requests from the old epoch result in a new 
> TRANSACTION_TIMED_OUT error code, which is propagated to the application.
> 3. The producer recovers by sending InitProducerId with the current epoch. 
> The coordinator returns the bumped epoch.
> One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH 
> from Produce requests. Partition leaders will not generally know if a bumped 
> epoch was the result of a timed out transaction or a fenced producer. 
> Possibly the producer can treat these errors as abortable when they come from 
> Produce responses. In that case, the user would try to abort the transaction 
> and then we can see if it was due to a timeout or otherwise.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts

2020-10-21 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218446#comment-17218446
 ] 

Bill Bejeck commented on KAFKA-9803:


As discussed with [~bchen225242] offline, looks like KIP-588 won't make 2.7.  
As part of the 2.7.0 release process, I'm going to move the fix version to 2.8.0

> Allow producers to recover gracefully from transaction timeouts
> ---
>
> Key: KAFKA-9803
> URL: https://issues.apache.org/jira/browse/KAFKA-9803
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> Transaction timeouts are detected by the transaction coordinator. When the 
> coordinator detects a timeout, it bumps the producer epoch and aborts the 
> transaction. The epoch bump is necessary in order to prevent the current 
> producer from being able to begin writing to a new transaction which was not 
> started through the coordinator.  
> Transactions may also be aborted if a new producer with the same 
> `transactional.id` starts up. Similarly this results in an epoch bump. 
> Currently the coordinator does not distinguish these two cases. Both will end 
> up as a `ProducerFencedException`, which means the producer needs to shut 
> itself down. 
> We can improve this with the new APIs from KIP-360. When the coordinator 
> times out a transaction, it can remember that fact and allow the existing 
> producer to claim the bumped epoch and continue. Roughly the logic would work 
> like this:
> 1. When a transaction times out, set lastProducerEpoch to the current epoch 
> and do the normal bump.
> 2. Any transactional requests from the old epoch result in a new 
> TRANSACTION_TIMED_OUT error code, which is propagated to the application.
> 3. The producer recovers by sending InitProducerId with the current epoch. 
> The coordinator returns the bumped epoch.
> One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH 
> from Produce requests. Partition leaders will not generally know if a bumped 
> epoch was the result of a timed out transaction or a fenced producer. 
> Possibly the producer can treat these errors as abortable when they come from 
> Produce responses. In that case, the user would try to abort the transaction 
> and then we can see if it was due to a timeout or otherwise.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan opened a new pull request #9473: KAFKA-10545: Create topic IDs in ZooKeeper and Controller

2020-10-21 Thread GitBox


jolshan opened a new pull request #9473:
URL: https://github.com/apache/kafka/pull/9473


   Topic IDs must be created for all new topics and all existing topics that do 
not yet have a topic ID. In ZooKeeper, the ID is written to the TopicZNode, and 
in the controller, it is stored in a map. 
   
   This is a preliminary change before the second part of KAFKA-10545, which 
will propagate these IDs to brokers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-21 Thread GitBox


bbejeck commented on pull request #9468:
URL: https://github.com/apache/kafka/pull/9468#issuecomment-713730656


   Hi @mimaison , do you think we can get this into 2.6.1?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-10-21 Thread GitBox


mjsax commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-713726509


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2020-10-21 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218420#comment-17218420
 ] 

Tom Bentley commented on KAFKA-10579:
-

Looking at the code it seems to me this might be a lack of thread safety in 
Reflections, and so might be addressed by 
https://github.com/ronmamo/reflections/issues/281 once a version with that fix 
in it is released.

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9406:
URL: https://github.com/apache/kafka/pull/9406#discussion_r509456514



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##
@@ -2667,4 +2760,43 @@ private void assertFutureFailure(Future future, 
Class ex
 }
 }
 
+private void createMockClientWithMaxFlightOneMetadataPending() {
+client = new MockClient(time, metadata) {

Review comment:
   Wonder if we should consider adding max inflight behavior directly to 
`MockClient`. Seems like a notable difference from `NetworkClient`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -444,10 +444,25 @@ private boolean maybeSendAndPollTransactionalRequest() {
 AbstractRequest.Builder requestBuilder = 
nextRequestHandler.requestBuilder();
 Node targetNode = null;
 try {
-targetNode = awaitNodeReady(nextRequestHandler.coordinatorType());
-if (targetNode == null) {
+FindCoordinatorRequest.CoordinatorType coordinatorType = 
nextRequestHandler.coordinatorType();
+targetNode = coordinatorType != null ?
+transactionManager.coordinator(coordinatorType) :
+client.leastLoadedNode(time.milliseconds());
+if (targetNode != null) {
+if (!awaitNodeReady(targetNode, coordinatorType)) {
+log.trace("Target node {} not ready within request 
timeout, will retry when node is ready.", targetNode);
+maybeFindCoordinatorAndRetry(nextRequestHandler);
+return true;
+}
+} else if (coordinatorType != null) {
+log.trace("Coordinator not known for {}, will retry {} after 
finding coordinator.", coordinatorType, requestBuilder.apiKey());
 maybeFindCoordinatorAndRetry(nextRequestHandler);
 return true;
+} else {
+log.trace("No nodes available to send requests, will poll and 
retry when until a node is ready.");
+transactionManager.retry(nextRequestHandler);
+client.poll(retryBackoffMs, time.milliseconds());
+return true;

Review comment:
   I agree polling seems sufficient. We will still have an opportunity to 
refresh metadata if the current connection fails for some reason.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-21 Thread GitBox


jolshan commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-713724055


   Looks like the right configurations are added to me. Might want to get one 
more pair of eyes and make sure the tests run.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-21 Thread GitBox


vvcephei opened a new pull request #9472:
URL: https://github.com/apache/kafka/pull/9472


   Add a Jenkinsfile for the 2.3 branch so PRs can be built
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-21 Thread GitBox


vvcephei commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-713719243


   @ijuma  @jolshan @mumrah , are any of you able to review this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-21 Thread GitBox


vvcephei opened a new pull request #9471:
URL: https://github.com/apache/kafka/pull/9471


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

2020-10-21 Thread GitBox


ableegoldman commented on pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#issuecomment-713709721


   Cherry-picked to 2.7 (had to fix up a test in StreamsPartitionAssignorTest 
that was relying on the new `ReferenceContainer` which is only in trunk)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10618) Add UUID class, use in protocols

2020-10-21 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10618.

Resolution: Fixed

[https://github.com/apache/kafka/pull/9454/files] was merged.

> Add UUID class, use in protocols
> 
>
> Key: KAFKA-10618
> URL: https://issues.apache.org/jira/browse/KAFKA-10618
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Before implementing topic IDs, a public UUID class must be created and used 
> in protocols



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

2020-10-21 Thread GitBox


ableegoldman commented on pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#issuecomment-713685350


   Merged to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

2020-10-21 Thread GitBox


ableegoldman merged pull request #9446:
URL: https://github.com/apache/kafka/pull/9446


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-21 Thread GitBox


mimaison commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-713678526


   @vvcephei Yes, feel free to merge in 2.6 once it's ready



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-10-21 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-713664978


   Just for reference, I don't see those failing tests on trunk 
(67bc4f08feb50ac135a4d8e1d469747102aad3a6).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-21 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-713663316


   FYI, I just got a successful build of this locally with master merged in.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2020-10-21 Thread GitBox


tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-713663078


   @guozhangwang, @hachikuji I was thinking... solving it as I have now 
(keeping track of the epoch) doesn't address another potential problem case 
where the tenure as leader is very short and the background task to load the 
state runs after the background task to unload the state in the same epoch. 
Obviously in this case the load should not be done (I guess it could result in 
a broker not returning NOT_COORDINATOR when it should, based on incorrect state 
of `ownedPartitions`). We could track a high watermark coordinator epoch and 
guard the load with a check (as well as the unload). 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-21 Thread GitBox


vvcephei commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-713659242


   ... I'm not sure how to get Jenkins to test this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-10-21 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-713658414


   Hmm, actually I see those same 20 tests fail for me locally. Can you take a 
look, @dongjinleekr ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] joshuagrisham commented on pull request #9470: Add recursive support to Connect Cast and ReplaceField transforms, and support for casting complex types to either a native or JSON stri

2020-10-21 Thread GitBox


joshuagrisham commented on pull request #9470:
URL: https://github.com/apache/kafka/pull/9470#issuecomment-713658739


   Hi @xvrl I pulled the latest trunk before I applied my changes (and should 
be included as part of the PR here) but I just wanted to mention, even after a 
fresh clone of trunk I saw some kind of issue in `ReplaceField` here 
https://github.com/apache/kafka/commit/eab61cad2c418786ab5e58aa1267f689d82a61d1#diff-00d03285691e372793d34d1f5090ddf4bdf27cadcd568de11e9921ab49444885R92-R95
   
   It is complaining about some of the types in this new method 
`translateDeprecatedConfigs` from here: 
https://github.com/apache/kafka/blob/f46d4f4fce341326c06c0aa8b2d0d64982573658/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java#L51
   
   Not sure if this is something you were already aware or if there is really 
an issue here? but I just thought to mention!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-21 Thread GitBox


vvcephei commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-713658911







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >