[jira] [Commented] (KAFKA-8211) Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan

2019-04-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815064#comment-16815064
 ] 

ASF GitHub Bot commented on KAFKA-8211:
---

huxihx commented on pull request #6561: KAFKA-8211: Flaky test: 
testResetOffsetsExportImportPlan
URL: https://github.com/apache/kafka/pull/6561
 
 
   https://issues.apache.org/jira/browse/KAFKA-8211
   
   Reduced offset-committing interval from 5 seconds to 1 second, hoping 
consumer#committed returns offset more quickly. Besides, enriched the output 
message for the exceptional case.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan
> -
>
> Key: KAFKA-8211
> URL: https://issues.apache.org/jira/browse/KAFKA-8211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.3.0
>Reporter: Bill Bejeck
>Assignee: huxihx
>Priority: Major
> Fix For: 2.3.0
>
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20778/]
>  
> {noformat}
> Error Message
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
> Stacktrace
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:381)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:364)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.produceConsumeAndShutdown(ResetConsumerGroupOffsetTest.scala:359)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:323)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> 

[jira] [Assigned] (KAFKA-8211) Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan

2019-04-10 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-8211:
-

Assignee: huxihx

> Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan
> -
>
> Key: KAFKA-8211
> URL: https://issues.apache.org/jira/browse/KAFKA-8211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.3.0
>Reporter: Bill Bejeck
>Assignee: huxihx
>Priority: Major
> Fix For: 2.3.0
>
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20778/]
>  
> {noformat}
> Error Message
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
> Stacktrace
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:381)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:364)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.produceConsumeAndShutdown(ResetConsumerGroupOffsetTest.scala:359)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:323)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> 

[jira] [Resolved] (KAFKA-8161) Comma conflict when run script bin/kafka-configs.sh with config 'follower.replication.throttled.replicas'

2019-04-10 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-8161.
---
Resolution: Not A Problem

> Comma conflict when run script  bin/kafka-configs.sh with config 
> 'follower.replication.throttled.replicas'
> --
>
> Key: KAFKA-8161
> URL: https://issues.apache.org/jira/browse/KAFKA-8161
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Haiping
>Priority: Minor
>
> when executing config command,it suggest  that 
> follower.replication.throttled.replicas  must match for format 
> [partitionId],[brokerId]:[partitionId],[brokerId]:[partitionId],[brokerId] 
> etc. but when config like that, it run with the following error:
> bin/kafka-configs.sh --entity-type topics --entity-name topic-test1  
> --zookeeper  127.0.0.1:2181/kafka --add-config 
> 'follower.replication.throttled.replicas=0,1:1,2' --alter
> Error while executing config command requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>  java.lang.IllegalArgumentException: requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>      at scala.Predef$.require(Predef.scala:224)
>      at 
> kafka.admin.ConfigCommand$.parseConfigsToBeAdded(ConfigCommand.scala:162)
>      at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:81)
>      at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:68)
>      at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> It seem that comma has been the separator of both replicas 
> {color:#33}such as{color} ([partitionId],[brokerId])  and keys such as 
> (key=val,key=val).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8161) Comma conflict when run script bin/kafka-configs.sh with config 'follower.replication.throttled.replicas'

2019-04-10 Thread Haiping (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815053#comment-16815053
 ] 

Haiping commented on KAFKA-8161:


h3. {color:#33}@huxihx  It worked. Thank you for your guidance.
{color}

> Comma conflict when run script  bin/kafka-configs.sh with config 
> 'follower.replication.throttled.replicas'
> --
>
> Key: KAFKA-8161
> URL: https://issues.apache.org/jira/browse/KAFKA-8161
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Haiping
>Priority: Minor
>
> when executing config command,it suggest  that 
> follower.replication.throttled.replicas  must match for format 
> [partitionId],[brokerId]:[partitionId],[brokerId]:[partitionId],[brokerId] 
> etc. but when config like that, it run with the following error:
> bin/kafka-configs.sh --entity-type topics --entity-name topic-test1  
> --zookeeper  127.0.0.1:2181/kafka --add-config 
> 'follower.replication.throttled.replicas=0,1:1,2' --alter
> Error while executing config command requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>  java.lang.IllegalArgumentException: requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>      at scala.Predef$.require(Predef.scala:224)
>      at 
> kafka.admin.ConfigCommand$.parseConfigsToBeAdded(ConfigCommand.scala:162)
>      at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:81)
>      at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:68)
>      at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> It seem that comma has been the separator of both replicas 
> {color:#33}such as{color} ([partitionId],[brokerId])  and keys such as 
> (key=val,key=val).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8215) Limit memory usage of RocksDB

2019-04-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814992#comment-16814992
 ] 

ASF GitHub Bot commented on KAFKA-8215:
---

ableegoldman commented on pull request #6560: KAFKA-8215: Pt I. Share block 
cache between instances
URL: https://github.com/apache/kafka/pull/6560
 
 
   As of v5.12, RocksDB provides an API for sharing the block cache across all 
instances in a single process. Because it involves passing the same cache to 
the Rocks Options, this ability is not supported by the current Streams configs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Limit memory usage of RocksDB
> -
>
> Key: KAFKA-8215
> URL: https://issues.apache.org/jira/browse/KAFKA-8215
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> The memory usage of Streams is currently unbounded in part because of 
> RocksDB, which consumes memory on a per-instance basis. Each instance (ie 
> each persistent state store) will have its own write buffer, index blocks, 
> and block cache. The size of these can be configured individually, but there 
> is currently no way for a Streams app to limit the total memory available 
> across instances. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8215) Limit memory usage of RocksDB

2019-04-10 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8215:
--

 Summary: Limit memory usage of RocksDB
 Key: KAFKA-8215
 URL: https://issues.apache.org/jira/browse/KAFKA-8215
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


The memory usage of Streams is currently unbounded in part because of RocksDB, 
which consumes memory on a per-instance basis. Each instance (ie each 
persistent state store) will have its own write buffer, index blocks, and block 
cache. The size of these can be configured individually, but there is currently 
no way for a Streams app to limit the total memory available across instances. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8207) StickyPartitionAssignor for KStream

2019-04-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814976#comment-16814976
 ] 

Matthias J. Sax commented on KAFKA-8207:


That is by design. Kafka Streams' internal "StreamPartitionAssignor" also 
implements a "sticky" policy. I think we can close this ticket as invalid.

What do you try to achieve and why do you want to change the partition assignor?

> StickyPartitionAssignor for KStream
> ---
>
> Key: KAFKA-8207
> URL: https://issues.apache.org/jira/browse/KAFKA-8207
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: neeraj
>Priority: Major
>
> In KStreams I am not able to give a sticky partition assignor or my custom 
> partition assignor.
> Overriding the property while building stream does not work
> streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CustomAssignor.class.getName());
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814974#comment-16814974
 ] 

Matthias J. Sax commented on KAFKA-8201:


{quote}Another question here : Why use infinite retention and cleanup.policy 
delete instead of log compaction for this case?
{quote}
Compaction does not make sense for repartition topics, because they are just 
use to "shuffle" data. We use infinite retention to guard against data loss, in 
case the application is offline for a longer period of time. Internally, Kafka 
Streams uses purge-data API to delete processed data to avoid that the 
repartition topics grow unbounded.

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
> kafka streams defaults and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-10 Thread Mohan Parthasarathy (JIRA)
Mohan Parthasarathy created KAFKA-8214:
--

 Summary: Handling RecordTooLargeException in the main thread
 Key: KAFKA-8214
 URL: https://issues.apache.org/jira/browse/KAFKA-8214
 Project: Kafka
  Issue Type: Bug
 Environment: Kafka version 1.0.2
Reporter: Mohan Parthasarathy


How can we handle this exception in the main application ? If this task incurs 
this exception, then it does not commit the offset and hence it goes in a loop 
after that. This happens during aggregation process. We already have a limit on 
the message size of the topic which is 15 MB.


org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=2_6, processor=KSTREAM-SOURCE-16, 
topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
partition=6, offset=2049
    at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)

   
    at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
   
    at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)

     
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
sending since an error caught with a previous record (key 
fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e 
timestamp 1554238297629) to topic 
-detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 15728866 
bytes when serialized which is larger than the maximum request size you have 
configured with the max.request.size configuration. 
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
  
    at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)  

   
    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)

   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)

    
    at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)

    
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
  
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
  
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
    
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    
 
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)

    
    at 

[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2019-04-10 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-6579:
--

Assignee: Sophie Blee-Goldman  (was: Ben Webb)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8213) KStreams interactive query documentation typo

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8213:
---

 Summary: KStreams interactive query documentation typo
 Key: KAFKA-8213
 URL: https://issues.apache.org/jira/browse/KAFKA-8213
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In [the Interactive Queries 
docs|https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app],
 we have a minor typo:

Actual: You can use the corresponding local data in other parts of your 
application code, as long as it doesn’t required calling the Kafka Streams API.
Expected: You can use the corresponding local data in other parts of your 
application code, as long as it doesn’t require calling the Kafka Streams API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8212) KStreams documentation Maven artifact table is cut off

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8212:
---

 Summary: KStreams documentation Maven artifact table is cut off
 Key: KAFKA-8212
 URL: https://issues.apache.org/jira/browse/KAFKA-8212
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis
 Attachments: Screen Shot 2019-04-10 at 2.04.09 PM.png

In the [Writing a Streams Application 
doc|https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams.html],
 the section "LIBRARIES AND MAVEN ARTIFACTS" has a table that lists out the 
Maven artifacts. The items in the group ID overflow and are cut off by the 
table column, even on a very large monitor.

Note that "artifact ID" seems to have its word break property set correctly. 
See the attached image.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8211) Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan

2019-04-10 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8211:
--

 Summary: Flaky Test: 
ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan
 Key: KAFKA-8211
 URL: https://issues.apache.org/jira/browse/KAFKA-8211
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients, unit tests
Affects Versions: 2.3.0
Reporter: Bill Bejeck
 Fix For: 2.3.0


Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20778/]

 
{noformat}
Error Message
java.lang.AssertionError: Expected that consumer group has consumed all 
messages from topic/partition.
Stacktrace
java.lang.AssertionError: Expected that consumer group has consumed all 
messages from topic/partition.
at kafka.utils.TestUtils$.fail(TestUtils.scala:381)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791)
at 
kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:364)
at 
kafka.admin.ResetConsumerGroupOffsetTest.produceConsumeAndShutdown(ResetConsumerGroupOffsetTest.scala:359)
at 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:323)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)

[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-04-10 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814844#comment-16814844
 ] 

Bill Bejeck commented on KAFKA-7937:


Failed again [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20778/]

 
{noformat}
Error Message
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
Stacktrace
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupState(ConsumerGroupCommand.scala:389)
at 
kafka.admin.ResetConsumerGroupOffsetTest$$anonfun$1.apply$mcZ$sp(ResetConsumerGroupOffsetTest.scala:91)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:788)
at 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:90)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at 

[jira] [Created] (KAFKA-8210) Missing link for KStreams in Streams DSL docs

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8210:
---

 Summary: Missing link for KStreams in Streams DSL docs
 Key: KAFKA-8210
 URL: https://issues.apache.org/jira/browse/KAFKA-8210
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In [the Streams DSL 
docs|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html],
 there is some text under the KTable section that reads: "We have already seen 
an example of a changelog stream in the section streams_concepts_duality."

"streams_concepts_duality" seems to indicate that it should be a link, but it 
is not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8208) Broken link for out-of-order data in KStreams Core Concepts doc

2019-04-10 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-8208:
--

Assignee: Bill Bejeck

> Broken link for out-of-order data in KStreams Core Concepts doc
> ---
>
> Key: KAFKA-8208
> URL: https://issues.apache.org/jira/browse/KAFKA-8208
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Michael Drogalis
>Assignee: Bill Bejeck
>Priority: Minor
>
> In the [core concepts 
> doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there 
> is a link in the "Out-of-Order Handling" section for "out-of-order data". It 
> 404's to https://kafka.apache.org/21/documentation/streams/tbd.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8209) Wrong link for KStreams DSL in Core Concepts doc

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8209:
---

 Summary: Wrong link for KStreams DSL in Core Concepts doc
 Key: KAFKA-8209
 URL: https://issues.apache.org/jira/browse/KAFKA-8209
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In the [core concepts 
doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is 
a link in the "States" section for "Kafka Streams DSL". It points to the wrong 
link.

Actual: 
https://kafka.apache.org/21/documentation/streams/developer-guide/#streams_dsl
Expected: 
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8208) Broken link for out-of-order data in KStreams Core Concepts doc

2019-04-10 Thread Michael Drogalis (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Drogalis updated KAFKA-8208:

Issue Type: Bug  (was: Improvement)

> Broken link for out-of-order data in KStreams Core Concepts doc
> ---
>
> Key: KAFKA-8208
> URL: https://issues.apache.org/jira/browse/KAFKA-8208
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Michael Drogalis
>Priority: Minor
>
> In the [core concepts 
> doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there 
> is a link in the "Out-of-Order Handling" section for "out-of-order data". It 
> 404's to https://kafka.apache.org/21/documentation/streams/tbd.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8208) Broken link for out-of-order data in KStreams Core Concepts doc

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8208:
---

 Summary: Broken link for out-of-order data in KStreams Core 
Concepts doc
 Key: KAFKA-8208
 URL: https://issues.apache.org/jira/browse/KAFKA-8208
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Michael Drogalis


In the [core concepts 
doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is 
a link in the "Out-of-Order Handling" section for "out-of-order data". It 404's 
to https://kafka.apache.org/21/documentation/streams/tbd.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7378) Consumer poll hangs if broker shutdown while consumer attempting to connect.

2019-04-10 Thread John Calcote (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814791#comment-16814791
 ] 

John Calcote commented on KAFKA-7378:
-

Ping - anyone thinking about this issue? The problem exhibits in non-test code 
also. We have very strict guidelines we try to follow for shutdown time in our 
application because we have restart timing requirements that are part of our 
product specification. We lose control of the ability to shutdown our kafka 
consumers in a timely fashion if consumer poll hangs when the broker goes away. 

Yes, we _could_ ensure our consumers are down first before we shutdown the 
broker, but that inter-app dependency has other costly side effects that we've 
not wanted to incur and it seems unreasonable for poll to not honor thread 
interrupt (in java).

> Consumer poll hangs if broker shutdown while consumer attempting to connect.
> 
>
> Key: KAFKA-7378
> URL: https://issues.apache.org/jira/browse/KAFKA-7378
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.1
> Environment: Java JDK 8, Linux RedHat 7.5
>Reporter: John Calcote
>Priority: Major
>
> I have an integration test that creates a Spring context, starting up the 
> application. In the test's @Before method, I create a zookeeper and 
> KafkaServer instance with available ephemeral service ports. The test's 
> @After method shuts down these service instances. The Spring context 
> registers a shutdown handler, which doesn't get called till after the @After 
> method is run. This means my application's consumer polling loop doesn't 
> detect a termination event until after the broker is gone. Here's what shows 
> up in my log4j log:
>  
> {noformat}
> 2018-09-05 11:56:13.980,1819313073756610 {} DEBUG o.a.k.c.NetworkClient 
> [hmdb-kafka-polling-thread] Initiating connection to node localhost:9092 (id: 
> -1 rack: null)
> 2018-09-05 11:56:13.980,1819313074091635 {} DEBUG o.a.k.c.n.Selector 
> [hmdb-kafka-polling-thread] Connection with localhost/127.0.0.1 disconnected
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_171]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[?:1.8.0_171]
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361) 
> ~[kafka-clients-0.11.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:326) 
> ~[kafka-clients-0.11.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) 
> ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:226)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  ~[kafka-clients-0.11.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> ~[kafka-clients-0.11.0.1.jar:?]
> {noformat}
>  
> This sequence continues forever because poll will not return and notice it's 
> time to terminate my polling loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7895:

Fix Version/s: (was: 2.2.0)
   2.2.1

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.1.2, 2.2.1
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8204:

Fix Version/s: 2.2.1
   2.1.2

> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.1.2, 2.2.1
>
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8204:

Affects Version/s: 2.1.0
   2.2.0
   2.1.1

> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7895:

Affects Version/s: 2.2.0

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7895:

Priority: Blocker  (was: Major)

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8204:

Priority: Blocker  (was: Major)

> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-7895:
-

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4336) Frequent log rolling when there's a mix of delayed and current data

2019-04-10 Thread Anders Aagaard (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814662#comment-16814662
 ] 

Anders Aagaard commented on KAFKA-4336:
---

This issue is possible to trigger with kafka-streams with default 
configuration. See the linked issue for more information.

> Frequent log rolling when there's a mix of delayed and current data
> ---
>
> Key: KAFKA-4336
> URL: https://issues.apache.org/jira/browse/KAFKA-4336
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.0
>Reporter: Ismael Juma
>Priority: Major
>
> Jun said in KAFKA-4099: suppose that you have 2 producers, one producing data 
> with the current timestamp and another producing data with timestamp 7 days 
> old (e.g., if some data is delayed or some old data is replayed), this will 
> still cause the log segments to roll frequently. This may not be common, but 
> can definitely happen. So, it seems we will still need to improve on how log 
> rolls.
> I am creating a new JIRA because KAFKA-4099 is closed so easy to lose track.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-10 Thread Anders Aagaard (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814661#comment-16814661
 ] 

Anders Aagaard commented on KAFKA-8201:
---

Thank you Bill

 

I figured this issue wouldn't and shouldn't be solved client side. I just 
wanted to ensure that it was clear that default setup in kafka+kafka-streams is 
currently risking taking down the brokers themselves.

 

We do have a workaround in place in the team, but we are a large organization, 
and forcing workarounds on everyone is extremely suboptimal. I hope this will 
put some pressure on the linked issue :) 

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
> kafka streams defaults and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput

2019-04-10 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-8200:
--

Assignee: Patrik Kleindl

> TopologyTestDriver should offer an iterable signature of readOutput
> ---
>
> Key: KAFKA-8200
> URL: https://issues.apache.org/jira/browse/KAFKA-8200
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Drogalis
>Assignee: Patrik Kleindl
>Priority: Minor
>  Labels: needs-kip
>
> When using the TopologyTestDriver, one examines the output on a topic with 
> the readOutput method. This method returns one record at a time, until no 
> more records can be found, at which point in returns null.
> Many times, the usage pattern around readOutput will involve writing a loop 
> to extract a number of records from the topic, building up a list of records, 
> until it returns null.
> It would be helpful to offer an iterable signature of readOutput, which 
> returns either an iterator or list over the records that are currently 
> available in the topic. This would effectively remove the loop that a user 
> needs to write for him/herself each time.
> Such a signature might look like:
> {code:java}
> public Iterable> readOutput(java.lang.String 
> topic);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8207) StickyPartitionAssignor for KStream

2019-04-10 Thread neeraj (JIRA)
neeraj created KAFKA-8207:
-

 Summary: StickyPartitionAssignor for KStream
 Key: KAFKA-8207
 URL: https://issues.apache.org/jira/browse/KAFKA-8207
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.0.0
Reporter: neeraj


In KStreams I am not able to give a sticky partition assignor or my custom 
partition assignor.

Overriding the property while building stream does not work

streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
CustomAssignor.class.getName());

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Tim Van Laer (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814356#comment-16814356
 ] 

Tim Van Laer commented on KAFKA-5998:
-

Running kafka-streams 2.0.1 in Docker 

{{$ cat /proc/version}}
{{Linux version 4.4.0-1075-aws (buildd@lgw01-amd64-035) (gcc version 5.4.0 
20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.10) ) #85-Ubuntu SMP Thu Jan 17 17:15:12 
UTC 2019}}

Filesystem is ext4. 

For completeness, our kafka-streams state directory is a Docker volume, so it's 
a directory on the host machine that's mounted inside the container. Several 
application instances (containers) on the same host, share the same state 
directory.  

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814291#comment-16814291
 ] 

Patrik Kleindl commented on KAFKA-5998:
---

SUSE Linux Enterprise Server 12 SP3

4.4.176-94.88-default #1 SMP Thu Mar 21 10:52:54 UTC 2019 (dea44ca) x86_64 
x86_64 x86_64 GNU/Linux

xfs/btrfs

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814233#comment-16814233
 ] 

Di Campo edited comment on KAFKA-5998 at 4/10/19 9:13 AM:
--

It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{cat /proc/version}}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}


was (Author: xmar):
It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{$ cat /proc/version }}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814233#comment-16814233
 ] 

Di Campo commented on KAFKA-5998:
-

It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{$ cat /proc/version }}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-4453) add request prioritization

2019-04-10 Thread Di Shang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814078#comment-16814078
 ] 

Di Shang commented on KAFKA-4453:
-

Hi

The new metric introduced here breaks our metric parser with NaN value

"kafka.network,SocketServer,ControlPlaneNetworkProcessorAvgIdlePercent": "NaN",

[https://github.com/apache/kafka/blob/2.2/core/src/main/scala/kafka/network/SocketServer.scala#L143]

Is this the best default value for this metric? Can we use a concrete number 
instead?

 

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata[1], rejecting 
> ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] 
> definition of data loss in terms of messages beyond the high watermark)[4].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> [1] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
>  [2] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
>  [3] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
>  [4] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.
> KIP-291: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)