[jira] [Comment Edited] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Akshay Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163248#comment-17163248
 ] 

Akshay Sharma edited comment on KAFKA-10284 at 7/23/20, 6:24 AM:
-

Hi [~bchen225242]/[~guozhang],

I've raised the bug related to the same issue 6 days ago. Please look at it 
once.

https://issues.apache.org/jira/browse/KAFKA-10285

 

Analysis,

`when i've not restarted the broker and restarted consumer, I could see below 
logs` 
 ```
 [2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance 
group 0 in state PreparingRebalance with old generation 0 
(__consumer_offsets-48) (reason: Adding new member 1-159490144 with group 
instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator)
 [2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 
generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
 [2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received 
from leader for group 0 for generation 1 
(kafka.coordinator.group.GroupCoordinator)


 [2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) 
with unknown member id rejoins, assigning new member id 1-159490335, while 
old member 1-159490144 will be removed. 
(kafka.coordinator.group.GroupCoordinator)
 [2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins 
during Stable stage will not trigger rebalance. 
(kafka.coordinator.group.GroupCoordinator)
 ```
 when restarted the broker, I could see broker is expecting some other 
member.id(old member.id of consumer)
 ```
 2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified 
as a known static member 1,but not matching the expected member.id 
1-159490144 (kafka.coordinator.group.GroupMetadata)
 ```

 

 


was (Author: akshaysh):
Hi [~bchen225242]/[~guozhang],

I've raised the bug related to the same issue. Please look at it once.

https://issues.apache.org/jira/browse/KAFKA-10285

 

Analysis,

`when i've not restarted the broker and restarted consumer, I could see below 
logs` 
```
[2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance 
group 0 in state PreparingRebalance with old generation 0 
(__consumer_offsets-48) (reason: Adding new member 1-159490144 with group 
instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 
generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received 
from leader for group 0 for generation 1 
(kafka.coordinator.group.GroupCoordinator)


[2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) 
with unknown member id rejoins, assigning new member id 1-159490335, while 
old member 1-159490144 will be removed. 
(kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins 
during Stable stage will not trigger rebalance. 
(kafka.coordinator.group.GroupCoordinator)
```
when restarted the broker, I could see broker is expecting some other 
member.id(old member.id of consumer)
```
2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified as 
a known static member 1,but not matching the expected member.id 1-159490144 
(kafka.coordinator.group.GroupMetadata)
```

 

 

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Akshay Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163248#comment-17163248
 ] 

Akshay Sharma commented on KAFKA-10284:
---

Hi [~bchen225242]/[~guozhang],

I've raised the bug related to the same issue. Please look at it once.

https://issues.apache.org/jira/browse/KAFKA-10285

 

Analysis,

`when i've not restarted the broker and restarted consumer, I could see below 
logs` 
```
[2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance 
group 0 in state PreparingRebalance with old generation 0 
(__consumer_offsets-48) (reason: Adding new member 1-159490144 with group 
instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 
generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received 
from leader for group 0 for generation 1 
(kafka.coordinator.group.GroupCoordinator)


[2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) 
with unknown member id rejoins, assigning new member id 1-159490335, while 
old member 1-159490144 will be removed. 
(kafka.coordinator.group.GroupCoordinator)
[2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins 
during Stable stage will not trigger rebalance. 
(kafka.coordinator.group.GroupCoordinator)
```
when restarted the broker, I could see broker is expecting some other 
member.id(old member.id of consumer)
```
2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified as 
a known static member 1,but not matching the expected member.id 1-159490144 
(kafka.coordinator.group.GroupMetadata)
```

 

 

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread GitBox


huxihx commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-662835750


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163394#comment-17163394
 ] 

Igor Soarez commented on KAFKA-10205:
-

I'm happy take this one on.

 

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-10205:
---

Assignee: Igor Soarez

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bob-barrett commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-23 Thread GitBox


bob-barrett commented on pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#issuecomment-662841978


   While addressing the feedback, I realized I had missed the case when 
replacing a current log with a future log as part of altering a partition's log 
dir. We also deferred metrics removal until the file deletion in that case. 
That has been fixed now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#issuecomment-662860332


   > add preempt(): Unit method for all ControllerEvent so that all events (and 
future events) must implement it
   > for events that have callbacks, move the preemption from individual 
methods to preempt()
   > add preemption for ApiPartitionReassignmentand ListPartitionReassignments
   
   Great idea



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-23 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-662919173


   @feyman2016 @huxihx , since you have experience in this test before, could 
you review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#discussion_r459252113



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -775,6 +775,24 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 verifyMarkPartitionsForTruncation()
   }
 
+  @Test
+  def testDefaultValueRestoredAfterDeleteDynamicConfig(): Unit = {
+val newProps = new Properties
+newProps.put(KafkaConfig.LogRetentionTimeMillisProp, "10")
+newProps.put(KafkaConfig.LogFlushIntervalMsProp, "1")
+TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, 
perBrokerConfig = false).all.get

Review comment:
   That makes sense, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r459262136



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   Could we have the test call `latch.countDown()` in a background thread 
with a delay, right before we call controller.shutdown? I guess it's possible 
to have race conditions with that solution





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-23 Thread GitBox


showuon opened a new pull request #9062:
URL: https://github.com/apache/kafka/pull/9062


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344
 ] 

Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:52 AM:


Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, 
Base64, Enveloped, XPath, XPath2, XSLT TransformServices)
2020-07-23 08:13:55.966Z INFO  -  -> SunPCSC: SunPCSC info: Sun PC/SC provider
2020-07-23 08:13:55.966Z INFO  -  -> BouncyCastleProvider: BC info: 
BouncyCastle Security Provider v1.61
2020-07-23 08:13:55.966Z INFO  -   SECURITY PROVIDERS -{noformat}
So my hypothesis is that the new provider ( in this case from Conscrypt) is 
inserted at the head of the list.  When the SSLSession is called 
(getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is  2^14 
(16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] 

When the new provider is added to the back of the list Kafka behaved fine and 
this issued disappeared completely. 

Hope this helps. 

 


was (Author: pwebb.itrs):
Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  When the new provider is added to the back of the list Kafka 
behaved fine. 

Hope this helps. 

 

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.

2020-07-23 Thread Shuo Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Zhang reassigned KAFKA-9343:
-

Assignee: Shuo Zhang

> Add ps command for Kafka and zookeeper process on z/OS.
> ---
>
> Key: KAFKA-9343
> URL: https://issues.apache.org/jira/browse/KAFKA-9343
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 2.4.0
> Environment: z/OS, OS/390
>Reporter: Shuo Zhang
>Assignee: Shuo Zhang
>Priority: Major
>  Labels: OS/390, z/OS
> Fix For: 2.5.0, 2.4.2
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> +Note: since the final change scope changed, I changed the summary and 
> description.+ 
> The existing method to check Kafka process for other platform doesn't 
> applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. 
> PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
> '\{print $1}') 
> --> 
> PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
> grep | awk '\{print $1}') 
> So does the zookeeper process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344
 ] 

Paul Webb commented on KAFKA-8154:
--

Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  When the new provider is added to the back of the list Kafka 
behaved fine. 

Hope this helps. 

 

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge opened a new pull request #9063: Fixed deprecated Gradle build Properties.

2020-07-23 Thread GitBox


leonardge opened a new pull request #9063:
URL: https://github.com/apache/kafka/pull/9063


   Gradle properties: `baseName`, `classifier` and `version` has been 
deprecated. So I have change these to `archiveBaseName`, `archiveClassifier` 
and `archiveVersion`. More infomration 
[here](https://docs.gradle.org/6.5/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8098) Flaky Test AdminClientIntegrationTest#testConsumerGroups

2020-07-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163397#comment-17163397
 ] 

Luke Chen commented on KAFKA-8098:
--

In the test, we first test removing 1 member from group, and then test removing 
the other 2 members from group, and it failed sometimes at the 2nd member 
number assert. After investigation, I found it's because we enabled auto commit 
for the consumers(default setting), and the removed consumer offset commit will 
get the {{UNKNOWN_MEMBER_ID}} error, which will then make the member rejoin. 
(check ConsumerCoordinator#OffsetCommitResponseHandler) So, that's why after 
the 2nd members removing, the members will sometimes be not empty.

I set the consumer config to disable the auto commit to fix this issue. Thanks.

> Flaky Test AdminClientIntegrationTest#testConsumerGroups
> 
>
> Key: KAFKA-8098
> URL: https://issues.apache.org/jira/browse/KAFKA-8098
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3459/tests]
> {quote}java.lang.AssertionError: expected:<2> but was:<0>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:647)
> at org.junit.Assert.assertEquals(Assert.java:633)
> at 
> kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1194){quote}
> STDOUT
> {quote}2019-03-12 10:52:33,482] ERROR [ReplicaFetcher replicaId=2, 
> leaderId=1, fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:33,485] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:35,880] WARN Unable to read additional data from client 
> sessionid 0x104458575770003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376)
> [2019-03-12 10:52:38,596] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-1-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:38,797] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-2-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:51,998] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:52,005] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,750] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,754] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,755] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 

[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485
 ] 

Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:04 PM:
-

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  


was (Author: cadonna):
I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration interesting. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485
 ] 

Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:21 PM:
-

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only one single 
deserializer is used and never the serializer.  


was (Author: cadonna):
I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Igor Piddubnyi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163529#comment-17163529
 ] 

Igor Piddubnyi commented on KAFKA-8582:
---

Hi [~mjsax], as discussed in PR please assign the ticket to me.

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590
 ] 

Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:45 PM:


I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
java.net.UnknownHostException: kafka3java.net.UnknownHostException: kafka3 at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962) 
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294) at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}


was (Author: tcsantos):
I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>Priority: Major
>
> Hello,
>  
> My cluster setup in based on VMs behind DNS CNAME .
> Example:  node.internal is a CNAME to either nodeA.internal or nodeB.internal
> Since kafka-client 1.2.1,  it has been observed that sometimes Kafka clients 
> get stuck on a loop with the exception:
> Example after nodeB.internal is replaced with nodeA.internal 
>  
> {code:java}
> 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer 
> clientId=consumer-6, groupId=consumer.group] Error connecting to node 
> nodeB.internal:9092 (id: 2 rack: null)
> java.net.UnknownHostException: nodeB.internal:9092
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1281) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1127) 

[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590
 ] 

Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:44 PM:


I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}


was (Author: tcsantos):
I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
// code placeholder
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect
                                     |  at java.lang.Thread.run(Thread.java:748)
{code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>

[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   If we want to guarantee that the `deadlineMs` is respected, I think that 
we must set the timeout of the AdminClient's call accordingly: 
`CreateTopicsOptions.timeoutMs`. With the default, I think that the call could 
be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   If we want to guarantee that the `deadlineMs` is respected, I think that 
we must set the timeout of the AdminClient's call accordingly: 
'CreateTopicsOptions.timeoutMs`. With the default, I think that the call could 
be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344
 ] 

Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:53 AM:


[~rsivaram]    Hi.  Please note that we were recently affected by this issue.  
I tried several kafka clients/ brokers and JDKs with no change. 

The actual cause in the end was that the java security providers was being 
changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, 
Base64, Enveloped, XPath, XPath2, XSLT TransformServices)
2020-07-23 08:13:55.966Z INFO  -  -> SunPCSC: SunPCSC info: Sun PC/SC provider
2020-07-23 08:13:55.966Z INFO  -  -> BouncyCastleProvider: BC info: 
BouncyCastle Security Provider v1.61
2020-07-23 08:13:55.966Z INFO  -   SECURITY PROVIDERS -{noformat}
So my hypothesis is that the new provider ( in this case from Conscrypt) is 
inserted at the head of the list.  When the SSLSession is called 
(getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is  2^14 
(16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] 

When the new provider is added to the back of the list Kafka behaved fine and 
this issued disappeared completely. 

Hope this helps. 

 


was (Author: pwebb.itrs):
Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSignatureFactory; 

[jira] [Updated] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.

2020-07-23 Thread Shuo Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Zhang updated KAFKA-9343:
--
Fix Version/s: 2.6.0

> Add ps command for Kafka and zookeeper process on z/OS.
> ---
>
> Key: KAFKA-9343
> URL: https://issues.apache.org/jira/browse/KAFKA-9343
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 2.4.0
> Environment: z/OS, OS/390
>Reporter: Shuo Zhang
>Assignee: Shuo Zhang
>Priority: Major
>  Labels: OS/390, z/OS
> Fix For: 2.5.0, 2.6.0, 2.4.2
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> +Note: since the final change scope changed, I changed the summary and 
> description.+ 
> The existing method to check Kafka process for other platform doesn't 
> applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. 
> PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
> '\{print $1}') 
> --> 
> PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
> grep | awk '\{print $1}') 
> So does the zookeeper process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-23 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-662968116


   Rebased on trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485
 ] 

Bruno Cadonna commented on KAFKA-8037:
--

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration interesting. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590
 ] 

Thiago Santos commented on KAFKA-9531:
--

I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
// code placeholder
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect
                                     |  at java.lang.Thread.run(Thread.java:748)
{code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>Priority: Major
>
> Hello,
>  
> My cluster setup in based on VMs behind DNS CNAME .
> Example:  node.internal is a CNAME to either nodeA.internal or nodeB.internal
> Since kafka-client 1.2.1,  it has been observed that sometimes Kafka clients 
> get stuck on a loop with the exception:
> Example after nodeB.internal is replaced with nodeA.internal 
>  
> {code:java}
> 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer 
> clientId=consumer-6, groupId=consumer.group] Error connecting to node 
> nodeB.internal:9092 (id: 2 rack: null)
> java.net.UnknownHostException: nodeB.internal:9092
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1281) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1127) 
> ~[?:1.8.0_222]
>   at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005)
>  ~[stormjar.jar:?]
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  ~[stormjar.jar:?]
>   

[jira] [Comment Edited] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163394#comment-17163394
 ] 

Igor Soarez edited comment on KAFKA-10205 at 7/23/20, 2:01 PM:
---

I'm happy take this one on.

I've opened [https://github.com/apache/kafka/pull/9064]


was (Author: soarez):
I'm happy take this one on.

 

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vitojeng opened a new pull request #9069: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2020-07-23 Thread GitBox


vitojeng opened a new pull request #9069:
URL: https://github.com/apache/kafka/pull/9069


   follow-up #8200 
   
   KAFKA-5876's PR break into multiple parts, this PR is part 2: apply 
UnknownStateStoreException 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459793713



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   I am happy to add a check in `StreamsConfig` and either throw or log a 
WARN depending how strict we want to be.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


guozhangwang commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459799708



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately
+
+// hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+throw new IllegalStateException(error);
+}
+
 stateRestoreAdapter.restoreBatch(restoreRecords);
 stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
 restoreCount += restoreRecords.size();

Review comment:
   My understanding is that for non-global state stores, we would also 
start ticking if we cannot make progress either due to exceptions or `poll()` 
returned no data, is that right? If yes, I'm +1 on covering the same for global 
state store here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


guozhangwang commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459800202



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -154,27 +164,34 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 "Error message was: {}", topicName, 
cause.toString());
 throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
 }
+} catch (final TimeoutException retryableException) {
+log.error("Creating topic {} timed out.\n" +
+"Error message was: {}", topicName, 
retryableException.toString());
 }
 }
 }
 
 
 if (!topicsNotReady.isEmpty()) {
-log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, remainingRetries);
+currentWallClockMs = time.milliseconds();
 
-Utils.sleep(retryBackOffMs);
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format("Could not 
create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is temporary not 
available.", retryTimeoutMs);
+log.error(timeoutError);
+// TODO: should we throw a different exception instead and 
catch it, to return a `INCOMPLETE_SOURCE_TOPIC_METADATA` error code

Review comment:
   I'm thinking that moving forward we should try to not create internal 
topics during rebalance but try pre-create in starting, but for now assuming 
this is still the case I think letting the whole application to die is fine --- 
i.e. treat it the same as source topics. Hence I'm leaning towards encoding 
INCOMPLETE_SOURCE_TOPIC_METADATA to shutdown the whole app, across all clients.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #9069: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2020-07-23 Thread GitBox


vitojeng commented on a change in pull request #9069:
URL: https://github.com/apache/kafka/pull/9069#discussion_r459805054



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -693,6 +695,16 @@ public void 
shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing(
 streams.queryMetadataForKey("store", "key", (topic, key, value, 
numPartitions) -> 0);
 }
 
+@Test(expected = UnknownStateStoreException.class)

Review comment:
   I am not sure if using annotation is a good choice. Would it be better 
to use `assertThrows` ?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164063#comment-17164063
 ] 

Guozhang Wang commented on KAFKA-10284:
---

What [~ableegoldman] described seems aligned to this, BUT I thought that fenced 
instance Y should still try to rejoin the group, but in [~ableegoldman] that 
thread did not try to rejoin at all?

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10268) dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10268.

Fix Version/s: 2.7.0
   Resolution: Fixed

> dynamic config like "--delete-config log.retention.ms" doesn't work
> ---
>
> Key: KAFKA-10268
> URL: https://issues.apache.org/jira/browse/KAFKA-10268
> Project: Kafka
>  Issue Type: Bug
>  Components: log, log cleaner
>Affects Versions: 2.1.1
>Reporter: zhifeng.peng
>Assignee: huxihx
>Priority: Major
> Fix For: 2.7.0
>
> Attachments: server.log.2020-07-13-14
>
>
> After I set "log.retention.ms=301000" to clean the data,i use the cmd
> "bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --delete-config log.retention.ms" to reset 
> to default.
> Static broker configuration like log.retention.hours is 168h and no topic 
> level configuration like retention.ms.
> it did not take effect actually although server.log print the broker 
> configuration like that.
> log.retention.check.interval.ms = 30
>  log.retention.hours = 168
>  log.retention.minutes = null
>  {color:#ff}log.retention.ms = null{color}
>  log.roll.hours = 168
>  log.roll.jitter.hours = 0
>  log.roll.jitter.ms = null
>  log.roll.ms = null
>  log.segment.bytes = 1073741824
>  log.segment.delete.delay.ms = 6
>  
> Then we can see that retention time is still 301000ms from the server.log and 
> segments have been deleted.
> [2020-07-13 14:30:00,958] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Found deletable segments with base offsets 
> [5005329,6040360] due to retention time 301000ms breach (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005329, size 
> 1073741222] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040360, size 
> 1073728116] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Incrementing log start offset to 7075648 
> (kafka.log.Log)
>  [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Found deletable segments with base offsets 
> [5005330,6040410] {color:#FF}due to retention time 301000ms{color} breach 
> (kafka.log.Log)
>  [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005330, size 
> 1073732368] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040410, size 
> 1073735366] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Incrementing log start offset to 7075685 
> (kafka.log.Log)
>  [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Deleting segment 5005329 (kafka.log.Log)
>  [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Deleting segment 6040360 (kafka.log.Log)
>  [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Deleting segment 5005330 (kafka.log.Log)
>  [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Deleting segment 6040410 (kafka.log.Log)
>  [2020-07-13 14:31:01,144] INFO Deleted log 
> /data/kafka_logs-test/test_retention-2/06040360.log.deleted. 
> (kafka.log.LogSegment)
>  [2020-07-13 14:31:01,144] INFO Deleted offset index 
> /data/kafka_logs-test/test_retention-2/06040360.index.deleted. 
> (kafka.log.LogSegment)
>  [2020-07-13 14:31:01,144] INFO Deleted time index 
> /data/kafka_logs-test/test_retention-2/06040360.timeindex.deleted.
>  (kafka.log.LogSegment)
>  
> Here are a few steps to reproduce it.
> 1、set log.retention.ms=301000:
> bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --add-config log.retention.ms=301000
> 2、produce messages to the topic:
> bin/kafka-producer-perf-test.sh --topic test_retention --num-records 1000 
> --throughput -1 --producer-props bootstrap.servers=10.129.104.15:9092 
> --record-size 1024
> 3、reset log.retention.ms to the default:
> bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --delete-config log.retention.ms
>  
> I have attched server.log. You can see the log from row 238 to row 731. 



--
This message was sent by Atlassian Jira

[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459790103



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
+
+long deadlineMs = NO_DEADLINE;
+final List storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+while (!storesToInitialize.isEmpty()) {
+// we remove and add back on failure to round-robin through all 
stores
+final StateStore stateStore = storesToInitialize.remove(0);
 globalStoreNames.add(stateStore.name());
 final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
 changelogTopics.add(sourceTopic);
-stateStore.init(globalProcessorContext, stateStore);
+
+try {
+stateStore.init(globalProcessorContext, stateStore);
+deadlineMs = NO_DEADLINE;
+} catch (final RetryableErrorException retryableException) {
+if (taskTimeoutMs == 0L) {
+throw new StreamsException(retryableException.getCause());
+}
+
+storesToInitialize.add(stateStore);
+
+final long currentWallClockMs = time.milliseconds();
+if (deadlineMs == NO_DEADLINE) {
+final long newDeadlineMs = currentWallClockMs + 
taskTimeoutMs;
+deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : 
newDeadlineMs;
+} else if (currentWallClockMs > deadlineMs) {
+throw new TimeoutException(String.format(
+"Global task did not make progress to restore state 
within %d ms. Adjust `task.timeout.ms` if needed.",
+currentWallClockMs - deadlineMs + taskTimeoutMs
+));
+}
+
+log.debug(retryableException.getMessage() + " Will retry. 
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);

Review comment:
   I understood what you meant, but I thought that because we actual do 
retry, it's better/cleaner/less-noisy to not log the full stack trace?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459790323



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {

Review comment:
   > I can't comment above, but can the poll call itself throw a timeout 
exception?
   
   No, it cannot. It would just return an empty result.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459791370



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   I guess strictly speaking, it's not necessary, but I personally prefer 
this strict style as it surfaces bugs quicker. If we write "robust" code, it 
could hide a bug. -- It's just a personal preference and we have similar code 
elsewhere. If you feel very strong about it, I am also ok to change it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459791741



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately
+
+// hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+throw new IllegalStateException(error);
+}
+
 stateRestoreAdapter.restoreBatch(restoreRecords);
 stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
 restoreCount += restoreRecords.size();

Review comment:
   Yes, it would block forever today. Curious to hear what @guozhangwang 
thinks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread GitBox


huxihx commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-663309863


   @rajinisivaram Thanks for the review, merging to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx merged pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread GitBox


huxihx merged pull request #9051:
URL: https://github.com/apache/kafka/pull/9051


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459818087



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -62,6 +65,7 @@
 private static final int NUM_PARTITIONS = 10;
 private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
 private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
 private Time time = Time.SYSTEM;

Review comment:
   Nice catch! I also removed the unused import `import 
org.apache.kafka.common.utils.Time;` Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] piddubnyi commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation

2020-07-23 Thread GitBox


piddubnyi commented on pull request #9017:
URL: https://github.com/apache/kafka/pull/9017#issuecomment-663037679


   Hi @mjsax, I commented the ticket to get it assigned.
   Regarding the KIP, I just created a fresh account in Confluence, however, it 
looks like there are no rights to trigger the KIP. Please suggest the best way 
to proceed. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459460614



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -62,6 +65,7 @@
 private static final int NUM_PARTITIONS = 10;
 private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
 private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
 private Time time = Time.SYSTEM;

Review comment:
   We can remove this field now that it's unused

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+private void waitForConsumerGroupOffsetSync(Consumer 
consumer, List topics, String consumerGroupId)
+throws InterruptedException {
+Admin backupClient = backup.kafka().createAdminClient();
+List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+IntStream.range(0, NUM_PARTITIONS).forEach(

Review comment:
   I'm not sure this is much better than a simple `for` loop. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-23 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r459536318



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -132,7 +132,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-.filter(x -> x.downstreamOffset() > 0)  // ignore offsets we 
cannot translate accurately
+.filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately

Review comment:
   @ryannedolan @heritamas If you guys think this is safe enough I can 
remove that filter. But it doesn't hurt to leave it there just in case a rogue 
negative offset comes through for whatever reason/bug... Please let me know 
what you think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459563205



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Would `remoteReplicasMap --= removedReplicas` work here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.

Review comment:
   Nit: insert `` tag to actually get the new paragraph rendered.
   
   Nit: `Topology -> `{@link Topology}`
   
   It's not really clear what "deterministic" means. We should elaborate more.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.
+ * If different instances build different runtime code logic the resulting 
behavior may be unexpected.

Review comment:
   "different" for sure, but this implies that one might have an operator 
the other does not. The observed issue is, that even if both contain the same 
operator, they might be added in different order (and thus be named 
differently) to the `Topology`, thus we should stretch that order matters.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1984,6 +1985,24 @@ public void 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
 task.initializeStateStores();
 }
 
+@Test(expected = TopologyException.class)

Review comment:
   We should not use this annotation but rather use `assertThrows` (we 
still have some code that does not use `assertThrows` but we try to lazily 
migrate our tests, as it provides a better test pattern).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -181,8 +182,16 @@ public StreamTask(final TaskId id,
 
 final TimestampExtractor defaultTimestampExtractor = 
config.defaultTimestampExtractor();
 final DeserializationExceptionHandler 
defaultDeserializationExceptionHandler = 
config.defaultDeserializationExceptionHandler();
+final Set sourceTopics = topology.sourceTopics();
 for (final TopicPartition partition : partitions) {
-final SourceNode source = topology.source(partition.topic());
+final String topicName = partition.topic();
+if (!sourceTopics.contains(topicName)) {
+throw new TopologyException(
+"Topic not found " + topicName + ". Is the Streams 
Topology built in a deterministic way?"

Review comment:
   `Topic not found` sounds like as-if the topic was not found in the 
cluster -- however, what actually happened is that we received a record but the 
record's topic is unknown in the sub-topology.
   
   Similar to above, "deterministic" is not really easy to understand. I would 
also not phrase it as a question, but as a statement:
   ```
   ... This may happen if different KafkaStreams instances of the same 
application execute different Topologies. Note that Topologies are only 
identical if all operators are added in the same order.
   ```
   
   Or similar.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163759#comment-17163759
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~zhowei] Did your run include both the log4j improvement and the other PR 
depending on fetchable partitions to do long polling?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Description: 
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.

  was:
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
- Partition#updateFollowerFetchState
- DelayedDeleteRecords#tryComplete
- Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira X tracks that.


> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10302) Ensure thread-safe access to Partition#remoteReplicasMap

2020-07-23 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-10302:
---

 Summary: Ensure thread-safe access to Partition#remoteReplicasMap
 Key: KAFKA-10302
 URL: https://issues.apache.org/jira/browse/KAFKA-10302
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski


A recent Jira (https://issues.apache.org/jira/browse/KAFKA-10301) exposed how 
easy it is to introduce nasty race conditions with the 
Partition#remoteReplicasMap data structure. It is a concurrent map which is 
modified inside a write lock but it is not always accessed through that lock.

Therefore it's possible for callers to access an intermediate state of the map, 
for instance in between updating the replica assignment for a given partition.



It would be good to ensure thread-safe access to the data structure in a way 
which makes it harder to introduce such regressions in the future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski opened a new pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski opened a new pull request #9065:
URL: https://github.com/apache/kafka/pull/9065


   We would previously update the map by adding the new replicas to the map and 
then removing the old ones. During a recent refactoring, we changed the logic 
to first clear the map and then add all the replicas to it.
   
   While this is done in a write lock, not all callers that access the map 
structure use a lock. It is safer to revert to the previous behavior of showing 
the intermediate state of the map with extra replicas, rather than an 
intermediate state of the map with no replicas.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663069630


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663088134


   Currently working on introducing a test case for this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459577643



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   Good question. Default `max.poll.interval.ms` is 5 minutes (ie, the 
deadline is set to 2.5 minutes by default) while default 
`api.default.timeout.ms` is 1 minutes? Thus we might be ok?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-23 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663109443


   @chia7712: Only 6 test failures in the latest run with your PR. 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595503536--chia7712--fix_8334_avoid_deadlock--3462b0008/report.html
   
   I will do another run on trunk for comparison.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10301) RemoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-10301:
---

 Summary: RemoteReplicasMap can be empty in certain race conditions
 Key: KAFKA-10301
 URL: https://issues.apache.org/jira/browse/KAFKA-10301
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
- Partition#updateFollowerFetchState
- DelayedDeleteRecords#tryComplete
- Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira X tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Summary: Partition#remoteReplicasMap can be empty in certain race 
conditions  (was: RemoteReplicasMap can be empty in certain race conditions)

> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
> - Partition#updateFollowerFetchState
> - DelayedDeleteRecords#tryComplete
> - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira X 
> tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459527097



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   I decided to not get fancy with refactorings - this is literally the old 
code 
(https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657))





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163686#comment-17163686
 ] 

Stanislav Kozlovski commented on KAFKA-10301:
-

cc [~rhauch] - this would be good to get in 2.6

> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks further 
> modifications to the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-23 Thread GitBox


mimaison commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
 return new DescribeLogDirsResult(new HashMap<>(futures));
 }
 
+private Map 
logDirDescriptions(DescribeLogDirsResponse response) {

Review comment:
   This can be static. Also should we keep it in `DescribeLogDirsResponse`?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+return prepareDescribeLogDirsResponse(error, logDir,
+prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+}
+
+private List prepareDescribeLogDirsTopics(
+long partitionSize, long offsetLag, String topic, int partition, 
boolean isFuture) {
+return singletonList(new DescribeLogDirsTopic()
+.setName(topic)
+.setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+.setPartitionIndex(partition)
+.setPartitionSize(partitionSize)
+.setIsFutureKey(isFuture)
+.setOffsetLag(offsetLag;
+}
+
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir,
+   
List topics) {
+return new DescribeLogDirsResponse(
+new DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+.setErrorCode(error.code())
+.setLogDir(logDir)
+.setTopics(topics)
+)));
+}
+
+@Test
+public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+Set brokers = Collections.singleton(0);
+String logDir = "/var/data/kafka";
+TopicPartition tp = new TopicPartition("topic", 12);
+long partitionSize = 1234567890;
+long offsetLag = 24;
+
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponseFrom(
+prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, 
partitionSize, offsetLag),
+env.cluster().nodeById(0));
+
+DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+Map>> 
descriptions = result.descriptions();
+assertEquals(brokers, descriptions.keySet());
+assertNotNull(descriptions.get(0));
+assertDescriptionContains(descriptions.get(0).get(), logDir, tp, 
partitionSize, offsetLag);
+
+Map> allDescriptions = 
result.allDescriptions().get();
+assertEquals(brokers, allDescriptions.keySet());
+assertDescriptionContains(allDescriptions.get(0), logDir, tp, 
partitionSize, offsetLag);
+}
+}
+
+private void assertDescriptionContains(Map 
descriptionsMap, String logDir,

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+return prepareDescribeLogDirsResponse(error, logDir,
+prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+}
+
+private List prepareDescribeLogDirsTopics(

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 

[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-23 Thread GitBox


mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * 
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * 
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+private final String dest;
+private final Consumer sendConsumer;
+private final ByteBufferOutputStream byteArrayOutputStream;
+private final DataOutput output;
+private int mark;
+
+public RecordsWriter(String dest, Consumer sendConsumer) {
+this.dest = dest;
+this.sendConsumer = sendConsumer;
+this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+this.output = new DataOutputStream(this.byteArrayOutputStream);
+this.mark = 0;
+}
+
+@Override
+public void writeByte(byte val) {
+try {
+output.writeByte(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeShort(short val) {
+try {
+output.writeShort(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeInt(int val) {
+try {
+output.writeInt(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeLong(long val) {
+try {
+output.writeLong(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeDouble(double val) {
+try {
+ByteUtils.writeDouble(val, output);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeByteArray(byte[] arr) {
+try {
+output.write(arr);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeUnsignedVarint(int i) {
+  

[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-23 Thread GitBox


AshishRoyJava commented on pull request #9034:
URL: https://github.com/apache/kafka/pull/9034#issuecomment-663066844


   @abbccdda Unit test added.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459582710



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   That's right. I misread the default value of `max.poll.interval.ms`, too 
many zeros for my eyes ;). The default works fine then. Do we want to protect 
ourselves if the user changes the default? Or shall we just call out that 
`api.default.timeout.ms` should be lower than `max.poll.interval.ms` somewhere?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Description: 
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks further modifications 
to the code.

  was:
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.


> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks further 
> modifications to the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663065197


   cc @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode

2020-07-23 Thread Yogesh BG (Jira)
Yogesh BG created KAFKA-10303:
-

 Summary: kafka producer says connect failed in cluster mode
 Key: KAFKA-10303
 URL: https://issues.apache.org/jira/browse/KAFKA-10303
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG


Hi

 

I am using kafka broker version 2.3.0

We have two setups with standalone(one node) and 3 nodes cluster

we pump huge data ~25MBPS, ~80K messages per second

It all works well in one node mode

but in case of cluster, producer start throwing connect failed(librd kafka)

after sometime again able to connect start sending traffic.

What could be the issue? some of the configurations are

 
replica.fetch.max.bytes=10485760
num.network.threads=12

num.replica.fetchers=6

queued.max.requests=5

# The number of threads doing disk I/O

num.io.threads=12

replica.socket.receive.buffer.bytes=1

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663094946


   While not part of your changes, I noticed the tests assumptions are pretty 
loose. For example, we assume 
https://github.com/apache/kafka/pull/9029/files#diff-a03d58195cfe119d0b1ed2693cd0d691L362
 always consume all the 100 messages. The test also assumes there are no 
duplicates. While this may be fine when running in memory, Connect semantics 
are at least once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-07-23 Thread GitBox


mjsax commented on pull request #9052:
URL: https://github.com/apache/kafka/pull/9052#issuecomment-663107263


   Only flaky tests failed. I updated the corresponding Jira tickets.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163749#comment-17163749
 ] 

Matthias J. Sax commented on KAFKA-9013:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7491/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:02 PM 

[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163747#comment-17163747
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

Two more:
 * 
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1630/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]
 * 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3480/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-23 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-662810186


   > Could you rebase again? I will run the system tests after that. Thanks.
   
   done. the known flaky ```group_mode_transactions_test.py``` is traced by 
#9059



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163770#comment-17163770
 ] 

Matthias J. Sax commented on KAFKA-8582:


Added you to the list of contributors and assigned the ticket to you. You can 
now also self-assign tickets.

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Igor Piddubnyi
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8582:
--

Assignee: Igor Piddubnyi

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Igor Piddubnyi
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163796#comment-17163796
 ] 

Sophie Blee-Goldman commented on KAFKA-10284:
-

You know, I think we actually hit this too, but weren't able to recognize the 
problem at the time. A few weeks ago one of our StreamThreads/Consumers seemed 
to "take off" from the group at some point, as evidenced by the steadily 
increasing last-rebalance-seconds-ago metric (whereas the other members had 
rebalanced multiple times since then). Right before this occurred we saw that 
same error message in the logs:

 
{code:java}
ERROR given member.id X is identified as a known static member 1,but not 
matching the expected member.id Y (kafka.coordinator.group.GroupMetadata)
{code}
Unfortunately we killed the client trying to remotely debug it so we couldn't 
get any more useful information. Would you say that this was mysterious 
encounter was likely due to the bug reported here? [~guozhang] [~bchen225242]

 

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163871#comment-17163871
 ] 

Boyang Chen commented on KAFKA-10284:
-

[~akshaysh] I didn't see any trace that the group coordinator gets migrated in 
the pasted ticket, so it might be a separate issue.

[~ableegoldman] Well, the symptom matches, but I don't know for sure if this is 
the same cause :)

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9161) Close gaps in Streams configs documentation

2020-07-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9161:
---
Fix Version/s: 2.6.0

> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie, newbie++
> Fix For: 2.6.0
>
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459588501



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Oh, this is a `Pool`, so we would have to add a `removeAll` method. 
Seems easy enough though since it can call the relevant method in 
`ConcurrentMap`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163761#comment-17163761
 ] 

Guozhang Wang commented on KAFKA-10134:
---

BTW I found that the main latency during rebalance is on discovering the 
coordinator while we keep getting "Join group failed with 
org.apache.kafka.common.errors.DisconnectException" and it kept retrying for 
about a minute. But I think you did not shutdown the broker in your experiment, 
is there anything else happening that cause that broker node to be not 
reachable?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


mjsax commented on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-663113463


   Btw: The PR should be against `trunk`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation

2020-07-23 Thread GitBox


mjsax commented on pull request #9017:
URL: https://github.com/apache/kafka/pull/9017#issuecomment-663122659


   I found your wiki account and granted write access. You should be all set.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-07-23 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163802#comment-17163802
 ] 

Ning Zhang commented on KAFKA-1:


Hi Chris, the purpose of this ticket is very interesting. I wonder what is the 
priority in the overall Kafka Connect backlog, or how is the progress so far 
(needs-KIP)? Thanks

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#issuecomment-663135369


   Jenkins failed on know flaky tests only.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163803#comment-17163803
 ] 

Matthias J. Sax commented on KAFKA-7540:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3496/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
{quote}java.lang.AssertionError: Assignment did not complete on time at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.assertTrue(Assert.java:42) at 
kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) at 
kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257)
 at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220){quote}

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.7.0, 2.6.1
>
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> 

[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163804#comment-17163804
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

And one more: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1645/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-23 Thread GitBox


soondenana commented on a change in pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#discussion_r459603473



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -1151,6 +1144,12 @@ class LogManager(logDirs: Seq[File],
   }
 }
   }
+
+  private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: 
TopicPartition): Log = {
+val removedLog = logs.remove(tp)
+if (removedLog != null) removedLog.removeLogMetrics()
+removedLog

Review comment:
   nit: Lets return Option(removedLog) to ease null checking by clients.
   
   Seems like the same object gets returned by `asyncDelete` but is only used 
in one place in test code, so many want to change the return value of that too. 
The less "null" we have the better.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File],
   if (cleaner != null)
 cleaner.handleLogDirFailure(dir)
 
-  val offlineCurrentTopicPartitions = currentLogs.collect {
-case (tp, log) if log.parentDir == dir => tp
-  }
-  offlineCurrentTopicPartitions.foreach { topicPartition => {
-val removedLog = currentLogs.remove(topicPartition)
-if (removedLog != null) {
-  removedLog.closeHandlers()
-  removedLog.removeLogMetrics()
+  def removeOfflineLogs(logs: Pool[TopicPartition, Log]): 
Iterable[TopicPartition] = {

Review comment:
   Thanks for deduping this code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-07-23 Thread GitBox


abbccdda commented on a change in pull request #8512:
URL: https://github.com/apache/kafka/pull/8512#discussion_r459648674



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1388,9 +1388,9 @@ public void commitSync() {
  */
 @Override
 public void commitSync(Duration timeout) {
+maybeThrowInvalidGroupIdException();

Review comment:
   If we throw here, we will not execute the current `finally` block to 
call `release()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459596274



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   `remoteReplicasMap --= removedReplicas` doesn't compile - the 
`remoteReplicasMap` is using a Kafka `Pool` class which itself is using a Java 
Map and I don't think they support the `--=` notation
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459599939



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Sounds good to introduce the method!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#issuecomment-663134416


   Only the `StreamsStandbyTask.test_standby_tasks_rebalance` system test 
failed and it's know to be buggy.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663147296


   totally agree that it may be better to revisit the tests in MM2 and I 
created a ticket https://issues.apache.org/jira/browse/KAFKA-10304 and assign 
it to myself. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-07-23 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10304:
--

 Summary: Revisit and improve the tests of MirrorMaker 2
 Key: KAFKA-10304
 URL: https://issues.apache.org/jira/browse/KAFKA-10304
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Ning Zhang
Assignee: Ning Zhang


due to the quick development of Kafka MM 2, unit and integration tests of 
MirrorMaker 2 were made just for covering each individual feature and some of 
them are simply copy-n-paste from the existing tests with small tweaks. It may 
be a good time to revisit and improve the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the failure in the middle of integration tests?
(3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
sync..) beyond the mirrored message in integration tests?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459645411



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0
+val future = executorService.submit((() => {
+  // Flip assignment between two replica sets
+  while (active.get) {
+val replicas = if (i % 2 == 0) {
+  firstReplicaSet
+} else {
+  secondReplicaSet
+}
+
+partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+i += 1
+Thread.sleep(1) // just to avoid tight loop
+  }
+}): Runnable)
+
+val deadline = 5.seconds.fromNow

Review comment:
   Lowered to 1s





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


jeffkbkim commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r459647273



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   @stanislavkozlovski thanks for the comment. i've tried this approach 
before and the test passes but sometimes output:
   ```
   [2020-07-23 11:12:43,316] ERROR [RequestSendThread controllerId=0] 
Controller 0 fails to send a request to broker localhost:51542 (id: 0 rack: 
null) (kafka.controller.RequestSendThread:76)
   java.lang.InterruptedException
   ```
   i also see a bit of flakiness with this approach as we cannot exactly time 
when `latch.countDown()` is called. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459598439



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {

Review comment:
   This fails incredibly quickly 100/100 times without the Partition.scala 
changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663130390


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459607476



##
File path: core/src/test/scala/unit/kafka/utils/PoolTest.scala
##
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils

Review comment:
   Remove `unit`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163807#comment-17163807
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-663136110


   Flaky tests only.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459616154



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0

Review comment:
   Shouldn't this be inside the thread state?

##
File path: core/src/main/scala/kafka/utils/Pool.scala
##
@@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
+  def removeAll(keys: Iterable[K]): Unit = 
pool.keySet().removeAll(keys.asJavaCollection)

Review comment:
   Nit: `()` is not needed.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()

Review comment:
   No need to repeat `LeaderAndIsrPartitionState` twice.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0
+val future = executorService.submit((() => {
+  // Flip assignment between two replica sets
+  while (active.get) {
+val replicas = if (i % 2 == 0) {
+  firstReplicaSet
+} else {
+  secondReplicaSet
+}
+
+partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+i += 1
+Thread.sleep(1) // just to avoid tight loop
+  }
+}): Runnable)
+
+val deadline = 5.seconds.fromNow
+while(deadline.hasTimeLeft()) {

Review comment:
   Nit: space missing after `while`.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

  1   2   >