[jira] [Assigned] (KAFKA-8078) Flaky Test TableTableJoinIntegrationTest#testInnerInner

2019-06-20 Thread Khaireddine Rezgui (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khaireddine Rezgui reassigned KAFKA-8078:
-

Assignee: Khaireddine Rezgui

> Flaky Test TableTableJoinIntegrationTest#testInnerInner
> ---
>
> Key: KAFKA-8078
> URL: https://issues.apache.org/jira/browse/KAFKA-8078
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3445/tests]
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Never received expected final result.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:246)
> at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner(TableTableJoinIntegrationTest.java:196){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8536) Error creating ACL Alter Topic in 2.2

2019-06-20 Thread Alvaro Peris (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868337#comment-16868337
 ] 

Alvaro Peris commented on KAFKA-8536:
-

Hi @[Evelyn 
Bayes|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=EeveeB]

The error is minor, it affects the Kafka-acls.sh program, if we create the ACL 
with the Java API Kafka works correctly as indicated in the documentation.

kafka-acls.sh problem seems that it is already solved by omkreddy in the link 
that you sent.

Thanks :)

> Error creating ACL Alter Topic in 2.2
> -
>
> Key: KAFKA-8536
> URL: https://issues.apache.org/jira/browse/KAFKA-8536
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.2.1
>Reporter: Alvaro Peris
>Priority: Critical
> Fix For: 2.2.2
>
>
> When we try to execute the statement to create an Alter Topic ACL in version 
> 2.2 of
> Kafka through the kafka-acls.
> """
> kafka-acls --authorizer-properties 
> zookeeper.connect=fastdata-zk-discovery:2181 \ 
>  --add \
>  --allow-principal User:MyUser \
>  --operation Alter \
>  --topic topic \
> """
> We get the following error: 
>  
> ResourceType TOPIC only supports operations
>  
> """
> Read,All,AlterConfigs,DescribeConfigs,Delete,Write,Create,Describe
> """
> It should be possible to create an Alter Topic ACL, according to the 
> documentation.
> Thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8564) NullPointerException when loading logs at startup

2019-06-20 Thread Edoardo Comar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar resolved KAFKA-8564.
--
   Resolution: Fixed
Fix Version/s: 2.2.2
   2.1.2
   2.3.0

> NullPointerException when loading logs at startup
> -
>
> Key: KAFKA-8564
> URL: https://issues.apache.org/jira/browse/KAFKA-8564
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Mickael Maison
>Assignee: Edoardo Comar
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.2
>
>
> If brokers restart when topics are being deleted, it's possible to end up 
> with a partition folder with the deleted suffix but without any log segments:
> {quote}ls -la 
> ./kafka-logs/3part3rep5-1.f2ce83b86df9416abe50d2e2299009c2-delete/
> total 8
> drwxr-xr-x@  4 mickael  staff   128  6 Jun 14:35 .
> drwxr-xr-x@ 61 mickael  staff  1952  6 Jun 14:35 ..
> -rw-r--r--@  1 mickael  staff    10  6 Jun 14:32 23261863.snapshot
> -rw-r--r--@  1 mickael  staff 0  6 Jun 14:35 leader-epoch-checkpoint
> {quote}
> From 2.2.1, brokers fail to start when loading such folders:
> {quote}[2019-06-19 09:40:48,123] ERROR There was an error in one of the 
> threads during logs loading: java.lang.NullPointerException 
> (kafka.log.LogManager)
>  [2019-06-19 09:40:48,126] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
>  java.lang.NullPointerException
>  at kafka.log.Log.activeSegment(Log.scala:1896)
>  at kafka.log.Log.(Log.scala:295)
>  at kafka.log.Log$.apply(Log.scala:2186)
>  at kafka.log.LogManager.loadLog(LogManager.scala:275)
>  at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:345)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {quote}
> With 2.2.0, upon loading such folders, brokers create a new empty log segment 
> and load that successfully.
> The change of behaviour was introduced in 
> [https://github.com/apache/kafka/commit/f000dab5442ce49c4852823c257b4fb0cdfe15aa]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8564) NullPointerException when loading logs at startup

2019-06-20 Thread Edoardo Comar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868400#comment-16868400
 ] 

Edoardo Comar commented on KAFKA-8564:
--

This has been fixed by [https://github.com/apache/kafka/pull/6968]

> NullPointerException when loading logs at startup
> -
>
> Key: KAFKA-8564
> URL: https://issues.apache.org/jira/browse/KAFKA-8564
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Mickael Maison
>Assignee: Edoardo Comar
>Priority: Blocker
>
> If brokers restart when topics are being deleted, it's possible to end up 
> with a partition folder with the deleted suffix but without any log segments:
> {quote}ls -la 
> ./kafka-logs/3part3rep5-1.f2ce83b86df9416abe50d2e2299009c2-delete/
> total 8
> drwxr-xr-x@  4 mickael  staff   128  6 Jun 14:35 .
> drwxr-xr-x@ 61 mickael  staff  1952  6 Jun 14:35 ..
> -rw-r--r--@  1 mickael  staff    10  6 Jun 14:32 23261863.snapshot
> -rw-r--r--@  1 mickael  staff 0  6 Jun 14:35 leader-epoch-checkpoint
> {quote}
> From 2.2.1, brokers fail to start when loading such folders:
> {quote}[2019-06-19 09:40:48,123] ERROR There was an error in one of the 
> threads during logs loading: java.lang.NullPointerException 
> (kafka.log.LogManager)
>  [2019-06-19 09:40:48,126] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
>  java.lang.NullPointerException
>  at kafka.log.Log.activeSegment(Log.scala:1896)
>  at kafka.log.Log.(Log.scala:295)
>  at kafka.log.Log$.apply(Log.scala:2186)
>  at kafka.log.LogManager.loadLog(LogManager.scala:275)
>  at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:345)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {quote}
> With 2.2.0, upon loading such folders, brokers create a new empty log segment 
> and load that successfully.
> The change of behaviour was introduced in 
> [https://github.com/apache/kafka/commit/f000dab5442ce49c4852823c257b4fb0cdfe15aa]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-06-20 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868422#comment-16868422
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Good question, [~rhauch]; I didn't consider it, but I could see both approaches 
making sense. Should we make this an option (insert into tombstones vs. pass 
them on unmodified) for that SMT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8572) Broker reports not leader partition as an error

2019-06-20 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-8572:


 Summary: Broker reports not leader partition as an error
 Key: KAFKA-8572
 URL: https://issues.apache.org/jira/browse/KAFKA-8572
 Project: Kafka
  Issue Type: Improvement
Reporter: Antony Stubbs


As this is an expected part of the broker protocol, is error an appropriate log 
level?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8573) kafka-topics.cmd OOM when connecting to a secure cluster without SSL properties

2019-06-20 Thread Jorg Heymans (JIRA)
Jorg Heymans created KAFKA-8573:
---

 Summary: kafka-topics.cmd OOM when connecting to a secure cluster 
without SSL properties
 Key: KAFKA-8573
 URL: https://issues.apache.org/jira/browse/KAFKA-8573
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 2.2.1
Reporter: Jorg Heymans


When using kafka-topics.cmd to connect to an SSL secured cluster, without 
specifying '--command-config=my-ssl.properties' on OOM is triggered:

 
{noformat}
[2019-06-20 14:25:07,998] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1131)
at java.lang.Thread.run(Thread.java:748){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-06-20 Thread Carlos Manuel Duclos Vergara (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868476#comment-16868476
 ] 

Carlos Manuel Duclos Vergara commented on KAFKA-7995:
-

Do you need help on this?

> Augment singleton protocol type to list for Kafka Consumer  
> 
>
> Key: KAFKA-7995
> URL: https://issues.apache.org/jira/browse/KAFKA-7995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Boyang Chen
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
>
> Right now Kafka consumer protocol uses a singleton marker to distinguish 
> Kafka Connect worker and normal consumer. This is not upgrade-friendly 
> approach since the protocol type could potential change over time. A better 
> approach is to support multiple candidacies so that the no downtime protocol 
> type switch could achieve.
> For example, if we are trying to upgrade a Kafka Streams application towards 
> a protocol type called "stream", right now there is no way to do this without 
> downtime since broker will reject changing protocol type to a different one 
> unless the group is back to empty. If we allow new member to provide a list 
> of protocol type instead ("consumer", "stream"), there would be no 
> compatibility issue.
> Alternative approach is to invent an admin API to change group's protocol 
> type on runtime. However, the burden introduced on administrator is not 
> trivial, since we need to guarantee the operation series to be correct, 
> otherwise we will see limp-upgrade experience in the midpoint, for example 
> while we are changing protocol type there was unexpected rebalance that 
> causes old members join failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-06-20 Thread Carlos Manuel Duclos Vergara (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868475#comment-16868475
 ] 

Carlos Manuel Duclos Vergara commented on KAFKA-8342:
-

Any progress on this? Do you need help?

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-560) Garbage Collect obsolete topics

2019-06-20 Thread Carlos Manuel Duclos Vergara (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868489#comment-16868489
 ] 

Carlos Manuel Duclos Vergara commented on KAFKA-560:


I guess nothing has happened here, any progress on this?

> Garbage Collect obsolete topics
> ---
>
> Key: KAFKA-560
> URL: https://issues.apache.org/jira/browse/KAFKA-560
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>Priority: Major
>  Labels: project
>
> Old junk topics tend to accumulate over time. Code may migrate to use new 
> topics leaving the old ones orphaned. Likewise there are some use cases for 
> temporary transient topics. It would be good to have a tool that could delete 
> any topic that had not been written to in a configurable period of time and 
> had no active consumer groups. Something like
>./bin/delete-unused-topics.sh --last-write [date] --zookeeper [zk_connect]
> This requires API support to get the last update time. I think it may be 
> possible to do this through the OffsetRequest now?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8573) kafka-topics.cmd OOM when connecting to a secure cluster without SSL properties

2019-06-20 Thread Jorg Heymans (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorg Heymans updated KAFKA-8573:

Environment: 
Windows 7

openjdk version "1.8.0_212-1-ojdkbuild"
OpenJDK Runtime Environment (build 1.8.0_212-1-ojdkbuild-b04)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
Description: 
When using kafka-topics.cmd to connect to an SSL secured cluster, without 
specifying '--command-config=my-ssl.properties' an OOM is triggered:

 
{noformat}
[2019-06-20 14:25:07,998] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1131)
at java.lang.Thread.run(Thread.java:748){noformat}
 

  was:
When using kafka-topics.cmd to connect to an SSL secured cluster, without 
specifying '--command-config=my-ssl.properties' on OOM is triggered:

 
{noformat}
[2019-06-20 14:25:07,998] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1131)
at java.lang.Thread.run(Thread.java:748){noformat}


> kafka-topics.cmd OOM when connecting to a secure cluster without SSL 
> properties
> ---
>
> Key: KAFKA-8573
> URL: https://issues.apache.org/jira/browse/KAFKA-8573
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.2.1
> Environment: Windows 7
> openjdk version "1.8.0_212-1-ojdkbuild"
> OpenJDK Runtime Environment (build 1.8.0_212-1-ojdkbuild-b04)
> OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
>Reporter: Jorg Heymans
>Priority: Minor
>
> When using kafka-topics.cmd to connect to an SSL secured cluster, without 
> specifying '--command-config=my-ssl.properties' an OOM is triggered:
>  
> {noformat}
> [2019-06-20 14:25:07,998] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1131)
> at java.lang.Thread.run(Thread.java:748){noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-06-20 Thread William Greer (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

William Greer updated KAFKA-8574:
-
Description: 
*Overview*
 While using EOS in Kafka Stream there is a race condition where the checkpoint 
file is written by the previous owning thread (Thread A) after the new owning 
thread (Thread B) reads the checkpoint file. Thread B then starts a restoration 
since no checkpoint file was found. A re-balance occurs before Thread B 
completes the restoration and a third Thread (Thread C) becomes the owning 
thread (Thread C) reads the checkpoint file written by Thread A which does not 
correspond to the current state of the RocksDB state store. When this race 
condition occurs the state store will have the most recent records and some 
amount of the oldest records but will be missing some amount of records in 
between. If A->Z represents the entire changelog to the present then when this 
scenario occurs the state store would contain records [A->K and Y->Z] where the 
state store is missing records K->Y.
  
 This race condition is possible due to dirty writes and dirty reads of the 
checkpoint file.
  
 *Example:*
 Thread refers to a Kafka Streams StreamThread [0]
 Thread A, B and C are running in the same JVM in the same streams application.
  
 Scenario:
 Thread-A is in RUNNING state and up to date on partition 1.
 Thread-A is suspended on 1. This does not write a checkpoint file because EOS 
is enabled [1]
 Thread-B is assigned to 1
 Thread-B does not find checkpoint in StateManager [2]
 Thread-A is assigned a different partition. Task writes suspended tasks 
checkpoints to disk. Checkpoint for 1 is written. [3]
 Thread-B deletes LocalStore and starts restoring. The deletion of the 
LocalStore does not delete checkpoint file. [4]
 Thread-C is revoked
 Thread-A is revoked
 Thread-B is revoked from the assigned status. Does not write a checkpoint file
 - Note Thread-B never reaches the running state, it remains in the 
PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state

Thread-C is assigned 1
 Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
where Thread-A left the state store for partition 1 at and not where Thread-B 
left the state store at.
 Thread-C begins restoring from checkpoint. The state store is missing an 
unknown number of records at this point
 Thread-B is assigned does not write a checkpoint file for partition 1, because 
it had not reached a running status before being revoked
  
 [0] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
 [1] 
[https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
 [2] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
 [3] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
 [4] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
 Specifically 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
 is where the state store is deleted but the checkpoint file is not.
  
 *How we recovered:*
 1. Deleted the impacted state store. This triggered multiple exceptions and 
initiated a re-balance.
  
 *Possible approaches to address this issue:*
 1. Add a collection of global task locks for concurrency protection of the 
checkpoint file. With the lock for suspended tasks being released after 
closeNonAssignedSuspendedTasks and the locks being acquired after lock release 
for the assigned tasks.
 2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
address the race condition but would make it so that the checkpoint file would 
never be ahead of the LocalStore in EOS, this would increase the likelihood of 
triggering a full restoration of a LocalStore on partition movement between 
threads on one host.
 3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
StreamThreads is assigned a task the host had before prefer to assign the task 
to the thread on the host that had the task before.
 4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up 
previous assignment step and a bootstrap new assignment. This would require all 
valid threads to 

[jira] [Updated] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-06-20 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8452:

Fix Version/s: 2.4.0
   3.0.0

> Possible Suppress buffer optimization: de-duplicate prior value
> ---
>
> Key: KAFKA-8452
> URL: https://issues.apache.org/jira/browse/KAFKA-8452
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0, 2.4.0
>
>
> As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
> addition to the "old" and "new" values for each record, to support 
> transparent downstream views.
> In many cases, the prior value is actually the same as the old value, and we 
> could avoid storing it separately. The challenge is that the old and new 
> values are already serialized into a common array (as a Change via the 
> FullChangeSerde), so the "prior" value would actually be a slice on the 
> underlying array. But, of course, Java does not have array slices.
> To get around this, we either need to switch to ByteBuffers (which support 
> slices) or break apart the serialized Change into just serialized old and new 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-06-20 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8452.
-
Resolution: Fixed

> Possible Suppress buffer optimization: de-duplicate prior value
> ---
>
> Key: KAFKA-8452
> URL: https://issues.apache.org/jira/browse/KAFKA-8452
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
> addition to the "old" and "new" values for each record, to support 
> transparent downstream views.
> In many cases, the prior value is actually the same as the old value, and we 
> could avoid storing it separately. The challenge is that the old and new 
> values are already serialized into a common array (as a Change via the 
> FullChangeSerde), so the "prior" value would actually be a slice on the 
> underlying array. But, of course, Java does not have array slices.
> To get around this, we either need to switch to ByteBuffers (which support 
> slices) or break apart the serialized Change into just serialized old and new 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868844#comment-16868844
 ] 

Lee Dongjin commented on KAFKA-8546:


Hi [~badai],

Here is the patch: [^KAFKA-8546.patch] I confirmed that it cleanly applies onto 
2.0.1 and passes all the tests. In short, it calls {{System#runFinalization}} 
on closing GZip[Input, Output]Stream to avoid the surge of [Inflater, Deflater] 
objects. Could you test this patch on your system?

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   

[jira] [Created] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-06-20 Thread William Greer (JIRA)
William Greer created KAFKA-8574:


 Summary: EOS race condition during task transition leads to 
LocalStateStore truncation in Kafka Streams 2.0.1
 Key: KAFKA-8574
 URL: https://issues.apache.org/jira/browse/KAFKA-8574
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1
Reporter: William Greer


*Overview*
While using EOS in Kafka Stream there is a race condition where the checkpoint 
file is written by the previous owning thread (Thread A) after the new owning 
thread (Thread B) reads the checkpoint file. Thread B then starts a restoration 
since no checkpoint file was found. A re-balance occurs before Thread B 
completes the restoration and a third Thread (Thread C) becomes the owning 
thread (Thread C) reads the checkpoint file written by Thread A which does not 
correspond to the current state of the RocksDB state store. When this race 
condition occurs the state store will have the most recent records and some 
amount of the oldest records but will be missing some amount of records in 
between. If A->Z represents the entire changelog to the present then when this 
scenario occurs the state store would contain records [A->K and Y->Z] where the 
state store is missing records K->Y.
 
This race condition is possible due to dirty writes and dirty reads of the 
checkpoint file.
 
*Example:*
Thread refers to a Kafka Streams StreamThread [0]
Thread A, B and C are running in the same JVM in the same streams application.
 
Scenario:
Thread-A is in RUNNING state and up to date on partition 1.
Thread-A is suspended on 1. This does not write a checkpoint file because EOS 
is enabled [1]
Thread-B is assigned to 1
Thread-B does not find checkpoint in StateManager [2]
Thread-A is assigned a different partition. Task writes suspended tasks 
checkpoints to disk. Checkpoint for 1 is written. [3]
Thread-B deletes LocalStore and starts restoring. The deletion of the 
LocalStore does not delete checkpoint file. [4]
Thread-C is revoked
Thread-A is revoked
Thread-B is revoked from the assigned status. Does not write a checkpoint file
- Note Thread-B never reaches the running state, it remains in the 
PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
Thread-C is assigned 1
Thread-C finds checkpoint in StateManager. This checkpoint corresponds to where 
Thread-A left the state store for partition 1 at and not where Thread-B left 
the state store at.
Thread-C begins restoring from checkpoint. The state store is missing an 
unknown number of records at this point
Thread-B is assigned does not write a checkpoint file for partition 1, because 
it had not reached a running status before being revoked
 
[0] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
[1] 
https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553
[2] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98
[3] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105
 & 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331
[4] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228
 & 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123
 Specifically 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119
 is where the state store is deleted but the checkpoint file is not.
 
*How we recovered:*
1. Deleted the impacted state store. This triggered multiple exceptions and 
initiated a re-balance.
 
*Possible approaches to address this issue:*
1. Add a collection of global task locks for concurrency protection of the 
checkpoint file. With the lock for suspended tasks being released after 
closeNonAssignedSuspendedTasks and the locks being acquired after lock release 
for the assigned tasks.
2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
address the race condition but would make it so that the checkpoint file would 
never be ahead of the LocalStore in EOS, this would increase the likelihood of 
triggering a full restoration of a LocalStore on partition movement between 
threads on one host.
3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
StreamThreads is assigned a task the host had before prefer to assign the task 
to the thread on the host that had the task 

[jira] [Assigned] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8546:
--

Assignee: Lee Dongjin

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Assignee: Lee Dongjin
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   

[jira] [Assigned] (KAFKA-8510) Update StreamsPartitionAssignor to use the built-in owned partitions to achieve stickiness (part 7)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8510:
--

Assignee: Sophie Blee-Goldman

> Update StreamsPartitionAssignor to use the built-in owned partitions to 
> achieve stickiness (part 7)
> ---
>
> Key: KAFKA-8510
> URL: https://issues.apache.org/jira/browse/KAFKA-8510
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Today this information is encoded as part of the user data bytes, we can now 
> remove it and leverage on the owned partitions of the protocol directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8494) Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8494:
--

Assignee: Sophie Blee-Goldman

> Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)
> ---
>
> Key: KAFKA-8494
> URL: https://issues.apache.org/jira/browse/KAFKA-8494
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8569) Integrate the poll timeout warning with leave group call

2019-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868932#comment-16868932
 ] 

ASF GitHub Bot commented on KAFKA-8569:
---

guozhangwang commented on pull request #6972: KAFKA-8569: integrate warning 
message under static membership
URL: https://github.com/apache/kafka/pull/6972
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Integrate the poll timeout warning with leave group call
> 
>
> Key: KAFKA-8569
> URL: https://issues.apache.org/jira/browse/KAFKA-8569
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Under static membership, we may be polluting our log by seeing a bunch of 
> consecutive warning message upon poll timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.

2019-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868946#comment-16868946
 ] 

ASF GitHub Bot commented on KAFKA-8392:
---

tuvtran commented on pull request #6977: KAFKA-8392: Fix old metrics leakage by 
brokers that have no leadership over any partition for a topic
URL: https://github.com/apache/kafka/pull/6977
 
 
   Addresses: https://issues.apache.org/jira/browse/KAFKA-8392
   
   - Added `removeOldLeaderMetrics` in `BrokerTopicStats` to remove 
`MessagesInPerSec`, `BytesInPerSec`, `BytesOutPerSec` for any broker that is no 
longer a leader of any partition for a particular topic
   - Modified `ReplicaManager` to remove the metrics of any topic that the 
current broker has no leadership (meaning the broker either becomes a follower 
for all of the partitions in that topic or stops being a replica)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka broker leaks metric when partition leader moves to another node.
> --
>
> Key: KAFKA-8392
> URL: https://issues.apache.org/jira/browse/KAFKA-8392
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.2.0
>Reporter: Kamal Chandraprakash
>Assignee: Tu Tran
>Priority: Major
>
> When a partition leader moves from one node to another due to an imbalance in 
> leader.imbalance.per.broker.percentage, the old leader broker still emits the 
> static metric value.
> Steps to reproduce:
> 1. Create a cluster with 3 nodes.
> 2. Create a topic with 2 partitions and RF=3
> 3. Generate some data using the console producer.
> 4. Move any one of the partition from one node to another using 
> reassign-partitions and preferred-replica-election script.
> 5. Generate some data using the console producer.
> 6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic.
> Is it the expected behavior?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7368) Support joining Windowed KTables

2019-06-20 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867770#comment-16867770
 ] 

John Roesler edited comment on KAFKA-7368 at 6/20/19 5:23 PM:
--

Also, note that the same issue limits the utility of aggregation on windowed 
tables, for the same reason.

I recently spent some time reflecting on the ideal solution to this problem. 
There's a simple solution and a good solution.

The simple solution is to address KAFKA-4212. There's no semantic problem with 
using a KeyValueStore in the current API, just the problem that it's doomed to 
run out of space for most windowed tables. Adding a TTL on the KeyValueStore 
API would directly address this problem. The risk is that, for high volume 
stores, the expiration itself could become expensive, in CPU, memory, and disk.

As an alternative, WindowStores already support an efficient expiration 
mechanism (they store the records in segments and then drop entire segments at 
a time). After some analysis, it seems we could allow swapping in a WindowStore 
instead of a KeyValueStore for such operations, but it requires:
* Replace the "bytes store" layer's interface with a new one. It's currently 
(e.g.) KeyValueStore, and we'd just create an independent 
interface KeyValueBytesStore. This would let us also create a WindowBytesStore 
that extends KeyValueBytesStore. In turn, this allows us to supply the 
expiration-efficient store implementation when we know that the data is 
windowed.
* To resolve some related ergonomic concerns, we'd probably also want to create 
a new WindowStore interface that extends KeyValueStore,V>.

These two points sound simple, but their implications will spider out through 
the whole code base. Likely, the change would be in the neighborhood of 10,000 
lines of code.


was (Author: vvcephei):
Also, note that the same issue limits the utility of aggregation on windowed 
tables, for the same reason.

I recently spent some time reflecting on the ideal solution to this problem. 
There's a simple solution and a good solution.

The simple solution is to address KAFKA-4212. There's no semantic problem with 
using a KeyValueStore in the current API, just the problem that it's doomed to 
run out of space for most windowed tables. Adding a TTL on the KeyValueStore 
API would directly address this problem. The risk is that, for high volume 
stores, the expiration itself could become expensive, in CPU, memory, and disk.

As an alternative, WindowStores already support an efficient expiration 
mechanism (they store the records in segments and then drop entire segments at 
a time). After some analysis, it seems we could allow swapping in a WindowStore 
instead of a KeyValueStore for such operations, but it requires:
* Replace the "bytes store" layer's interface with a new one. It's currently 
(e.g.) KeyValueStore, and we'd just create an independent 
interface KeyValueBytesStore. This would let us also create a WindowBytesStore 
that extends KeyValueBytesStore. In turn, this allows us to supply the 
expiration-efficient store implementation when we know that the data is 
windowed.
* To resolve some related ergonomic concerns, we'd probably also want to create 
a new WindowStore interface that extends KeyValueStore,V>.

These two points sound simple, but their implications will spider out through 
the whole code base. Likely, the change would be in the neighborhood of 20,000 
lines of code.

> Support joining Windowed KTables
> 
>
> Key: KAFKA-7368
> URL: https://issues.apache.org/jira/browse/KAFKA-7368
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, there is no good way to join two `KTable, V>`, aka 
> windowed KTables.
> They are KTable, so they have a `join` operator available, but it currently 
> will use a regular KeyValue store instead of a Windowed store, so it will 
> grow without bound and new windows enter.
> One option is to convert both KTables into KStream, and join them (which is a 
> windowed join), and then convert them back into KTables for further 
> processing, but this is an awkward way to accomplish an apparently 
> straightforward task.
> It should instead be possible to directly support it, but the trick will be 
> to make it impossible to accidentally use a window store for normal (aka 
> non-windowed) KTables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8546:
---
Attachment: KAFKA-8546.patch

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   

[jira] [Commented] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868842#comment-16868842
 ] 

ASF GitHub Bot commented on KAFKA-8546:
---

dongjinleekr commented on pull request #6976: KAFKA-8546: Call 
System#runFinalization to avoid memory leak caused by JDK-6293787
URL: https://github.com/apache/kafka/pull/6976
 
 
   This PR prevents the surge of `[Inflater, Deflater]` objects, by calling 
`System#runFinalization` on closing `GZip[Input, Output]Stream`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: Screen Shot 2019-05-30 at 1.27.25 pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 

[jira] [Commented] (KAFKA-8527) add dynamic maintenance broker config

2019-06-20 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868914#comment-16868914
 ] 

GEORGE LI commented on KAFKA-8527:
--

We had this implemented in a separate admin service outside of kafka.  One of 
its functions is for topic related changes.  e.g.  Topic Creation,  Topic 
partition expansion.  There is a ZK path/node which indicate which broker_id is 
under "maintenance" and thus when  creating topic/expanding topic partition, it 
will exclude this live broker_id from getting any new replica assignments. 

> add dynamic maintenance broker config
> -
>
> Key: KAFKA-8527
> URL: https://issues.apache.org/jira/browse/KAFKA-8527
> Project: Kafka
>  Issue Type: Improvement
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> Before we remove a broker for maintenance, we want to remove all partitions 
> out of the broker first to avoid introducing new Under Replicated Partitions 
> (URPs) . That is because shutting down (or killing) a broker that still hosts 
> live partitions will lead to temporarily reduced replicas of those 
> partitions. Moving partitions out of a broker can be done via partition 
> reassignment.  However, during the partition reassignment process, new topics 
> can be created by Kafka and thereby new partitions can be added to the broker 
> that is pending for removal. As a result, the removal process will need to 
> recursively moving new topic partitions out of the maintenance broker. In a 
> production environment in which topic creation is frequent and URP causing by 
> broker removal cannot be tolerated, the removal process can take multiple 
> iterations to complete the partition reassignment.  We want to provide a 
> mechanism to mask a broker as maintenance broker (Via Cluster Level Dynamic 
> configuration). One action Kafka can take for the maintenance broker is not 
> to assign new topic partitions to it, and thereby facilitate the broker 
> removal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8576) Consumer failed to join the coordinator

2019-06-20 Thread yanrui (JIRA)
yanrui created KAFKA-8576:
-

 Summary: Consumer failed to join the coordinator
 Key: KAFKA-8576
 URL: https://issues.apache.org/jira/browse/KAFKA-8576
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: yanrui
 Attachments: image-2019-06-21-10-52-38-762.png

Environment:
 single node kafka (2.1.1)6g 4c
 client(0.11.0.1 )
 consumer group number:1170
After running for a while, consumers can’t join the coordinator.The report is 
not the correct coordinator when describing the group.The consumer is  endless 
trap in the discovery group,then marking  the group coordinator dead .Ask for 
help analyzing the reason, thank you very much

   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8575) Investigate cleaning up task suspension (part 7)

2019-06-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8575:
---
Component/s: streams

> Investigate cleaning up task suspension (part 7)
> 
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8575) Investigate cleaning up task suspension (part 7)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8575:
---
Summary: Investigate cleaning up task suspension (part 7)  (was: 
Investigate cleaning up task suspension)

> Investigate cleaning up task suspension (part 7)
> 
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8575) Investigate cleaning up task suspension

2019-06-20 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8575:
--

 Summary: Investigate cleaning up task suspension
 Key: KAFKA-8575
 URL: https://issues.apache.org/jira/browse/KAFKA-8575
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sophie Blee-Goldman


With KIP-429 the suspend/resume of tasks may have minimal gains while adding a 
lot of complexity and potential bugs. We should consider removing/cleaning it 
up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-06-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8452:
---
Fix Version/s: (was: 3.0.0)

> Possible Suppress buffer optimization: de-duplicate prior value
> ---
>
> Key: KAFKA-8452
> URL: https://issues.apache.org/jira/browse/KAFKA-8452
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.4.0
>
>
> As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
> addition to the "old" and "new" values for each record, to support 
> transparent downstream views.
> In many cases, the prior value is actually the same as the old value, and we 
> could avoid storing it separately. The challenge is that the old and new 
> values are already serialized into a common array (as a Change via the 
> FullChangeSerde), so the "prior" value would actually be a slice on the 
> underlying array. But, of course, Java does not have array slices.
> To get around this, we either need to switch to ByteBuffers (which support 
> slices) or break apart the serialized Change into just serialized old and new 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-06-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869051#comment-16869051
 ] 

Matthias J. Sax commented on KAFKA-8574:


Is this the same as https://issues.apache.org/jira/browse/KAFKA-8187 or 
different?

> EOS race condition during task transition leads to LocalStateStore truncation 
> in Kafka Streams 2.0.1
> 
>
> Key: KAFKA-8574
> URL: https://issues.apache.org/jira/browse/KAFKA-8574
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Priority: Major
>
> *Overview*
>  While using EOS in Kafka Stream there is a race condition where the 
> checkpoint file is written by the previous owning thread (Thread A) after the 
> new owning thread (Thread B) reads the checkpoint file. Thread B then starts 
> a restoration since no checkpoint file was found. A re-balance occurs before 
> Thread B completes the restoration and a third Thread (Thread C) becomes the 
> owning thread (Thread C) reads the checkpoint file written by Thread A which 
> does not correspond to the current state of the RocksDB state store. When 
> this race condition occurs the state store will have the most recent records 
> and some amount of the oldest records but will be missing some amount of 
> records in between. If A->Z represents the entire changelog to the present 
> then when this scenario occurs the state store would contain records [A->K 
> and Y->Z] where the state store is missing records K->Y.
>   
>  This race condition is possible due to dirty writes and dirty reads of the 
> checkpoint file.
>   
>  *Example:*
>  Thread refers to a Kafka Streams StreamThread [0]
>  Thread A, B and C are running in the same JVM in the same streams 
> application.
>   
>  Scenario:
>  Thread-A is in RUNNING state and up to date on partition 1.
>  Thread-A is suspended on 1. This does not write a checkpoint file because 
> EOS is enabled [1]
>  Thread-B is assigned to 1
>  Thread-B does not find checkpoint in StateManager [2]
>  Thread-A is assigned a different partition. Task writes suspended tasks 
> checkpoints to disk. Checkpoint for 1 is written. [3]
>  Thread-B deletes LocalStore and starts restoring. The deletion of the 
> LocalStore does not delete checkpoint file. [4]
>  Thread-C is revoked
>  Thread-A is revoked
>  Thread-B is revoked from the assigned status. Does not write a checkpoint 
> file
>  - Note Thread-B never reaches the running state, it remains in the 
> PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
> Thread-C is assigned 1
>  Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
> where Thread-A left the state store for partition 1 at and not where Thread-B 
> left the state store at.
>  Thread-C begins restoring from checkpoint. The state store is missing an 
> unknown number of records at this point
>  Thread-B is assigned does not write a checkpoint file for partition 1, 
> because it had not reached a running status before being revoked
>   
>  [0] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
>  [1] 
> [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
>  [2] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
>  [3] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
>  [4] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
>  Specifically 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
>  is where the state store is deleted but the checkpoint file is not.
>   
>  *How we recovered:*
>  1. Deleted the impacted state store. This triggered multiple exceptions and 
> initiated a re-balance.
>   
>  *Possible approaches to address this issue:*
>  1. Add a collection of global task locks for concurrency protection of the 
> checkpoint file. With the lock for suspended tasks being released after 
> closeNonAssignedSuspendedTasks and the locks being acquired after 

[jira] [Commented] (KAFKA-8405) Remove deprecated preferred leader RPC and Command

2019-06-20 Thread Jose Armando Garcia Sancio (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868669#comment-16868669
 ] 

Jose Armando Garcia Sancio commented on KAFKA-8405:
---

[~m.sandeep] thanks for working on this. Unfortunately, we don't have a good 
way of tracking this kind of PR. We won't be able to merge in a very long time. 
There is also a risk that this PR will not merge successfully without conflicts 
at that point.

> Remove deprecated preferred leader RPC and Command
> --
>
> Key: KAFKA-8405
> URL: https://issues.apache.org/jira/browse/KAFKA-8405
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> For version 2.4.0, we deprecated:
> # AdminClient.electPreferredLeaders
> # ElectPreferredLeadersResult
> # ElectPreferredLeadersOptions
> # PreferredReplicaLeaderElectionCommand.
> For version 3.0.0 we should remove all of this symbols and the reference to 
> them. For the command that includes:
> # bin/kafka-preferred-replica-election.sh
> # bin/windows/kafka-preferred-replica-election.bat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-06-20 Thread Francisco Juan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868545#comment-16868545
 ] 

Francisco Juan commented on KAFKA-8335:
---

Hi Boquan

Cleaning up the {{cleaner-offset-checkpoint}} seems to be solving the issue.

Thanks a lot for the quick reply!

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8405) Remove deprecated preferred leader RPC and Command

2019-06-20 Thread Sandeep (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868542#comment-16868542
 ] 

Sandeep commented on KAFKA-8405:


[~jagsancio] I have already completed the changes. Can I still go ahead and 
create a PR ? We can merge it at the appropriate time

> Remove deprecated preferred leader RPC and Command
> --
>
> Key: KAFKA-8405
> URL: https://issues.apache.org/jira/browse/KAFKA-8405
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> For version 2.4.0, we deprecated:
> # AdminClient.electPreferredLeaders
> # ElectPreferredLeadersResult
> # ElectPreferredLeadersOptions
> # PreferredReplicaLeaderElectionCommand.
> For version 3.0.0 we should remove all of this symbols and the reference to 
> them. For the command that includes:
> # bin/kafka-preferred-replica-election.sh
> # bin/windows/kafka-preferred-replica-election.bat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-06-20 Thread Adam Bellemare (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868544#comment-16868544
 ] 

Adam Bellemare commented on KAFKA-7500:
---

[~ryannedolan] - Thanks for driving this forward, it looks extremely useful.

I'm currently evaluating this for my company's needs. I have a few questions.

1) Are the offset translation topics included in the patch you have above? I 
ask because I see that https://issues.apache.org/jira/browse/KAFKA-7815 is 
still open, and I am not sure if SourceTask is the provider of this info and to 
the overall status of the offset translations work.

2) How do you go about switching a consumer from one cluster to another, say in 
the case of a total cluster failure? I have noted that there exists the offsets 
topics that indicate the source offset and equivalent destination offset for a 
consumer group. Aside from the mechanisms of redirecting the consumer to the 
replica cluster, what do you do in terms of ensuring that the consumer is able 
to read from this destination cluster? My first instinct is to have a service 
that consumes from the offsets-sync topic and sets the offsets for each 
consumer group as the new sync events come in. If this is up to date with the 
latest events, then it should be possible to restart the consumer from the 
checkpoint in the destination cluster under the same consumer group id.

3) Do you use timestamps at all for failing over from one cluster to another? 
This is a bit of an extension to #2 above, and my current understanding is that 
it is not necessary to do so. I seem to recall that in previous versions of MM 
it was more common to use timestamps than offsets, since the guarantee for the 
offset wasn't present.

 

Anyways, thanks very much for your work here. I am eager to see this get into 
2.4 because I believe it will make multi-cluster much more approachable for 
smaller businesses.

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Priority: Minor
> Fix For: 2.4.0
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)