[jira] [Updated] (KAFKA-8035) Add tests for generics in KStream API

2019-03-18 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8035:
-
Labels: unit-test  (was: )

> Add tests for generics in KStream API 
> --
>
> Key: KAFKA-8035
> URL: https://issues.apache.org/jira/browse/KAFKA-8035
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: unit-test
>
> During the work on KAFKA-4217, it was discovered that some combinations of 
> Java generics block the usage of lambda functions ([see this 
> discussion|https://github.com/apache/kafka/pull/5273#discussion_r216810275]).
> To avoid using those blocking combinations of generics, tests shall be 
> implemented that verify that lambda functions can be used with the KStream 
> API. Those tests may also serve as regression tests to ensure that future 
> changes to the generics in the KStream API may not block lambda functions and 
> make the API incompatible with previous versions.
> Unlike other tests, the tests required here pass if they compile. For 
> example, to verify that the parameter {{mapper}} in
> {code:java}
>  KStream flatMap(final KeyValueMapper extends Iterable>> mapper);
> {code}
> accepts a {{KeyValueMapper}} specified as a lambda function that returns an 
> implementation of the {{Iterable}} interface, the following stream could be 
> specified in the test:
> {code:java}
> stream
> .flatMap((Integer key, Integer value) -> Arrays.asList(
> KeyValue.pair(key, value),
> KeyValue.pair(key, value),
> KeyValue.pair(key, value)))
> .foreach(action);
> {code}
> If the test compiles, the test passes.
> Other tests for {{flatMap}} need to check the bounds of the generics, e.g., 
> if the {{mapper}} accepts a {{KeyValueMapper}} specified as a lambda function 
> that takes a super class of K and V as inputs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8126) Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask

2019-03-18 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8126:


 Summary: Flaky Test 
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask
 Key: KAFKA-8126
 URL: https://issues.apache.org/jira/browse/KAFKA-8126
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, unit tests
Reporter: Guozhang Wang


{code}
Stacktrace
java.lang.AssertionError: 
  Expectation failure on verify:
WorkerSourceTask.run(): expected: 1, actual: 0
at org.easymock.internal.MocksControl.verify(MocksControl.java:242)
at 
org.powermock.api.easymock.internal.invocationcontrol.EasyMockMethodInvocationControl.verify(EasyMockMethodInvocationControl.java:126)
at org.powermock.api.easymock.PowerMock.verify(PowerMock.java:1476)
at org.powermock.api.easymock.PowerMock.verifyAll(PowerMock.java:1415)
at 
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask(WorkerTest.java:589)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
{code}

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3330/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testAddRemoveTask/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-8106:


Assignee: Flower.min

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

2019-03-20 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8135:


 Summary: Kafka Producer deadlocked on flush call with intermittent 
broker unavailability
 Key: KAFKA-8135
 URL: https://issues.apache.org/jira/browse/KAFKA-8135
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.1.0
Reporter: Guozhang Wang


In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
and the value is default to 2 minutes. We've observed that when it was set to 
MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
{{broker.flush}} call would be blocked during the time when its destination 
brokers are undergoing some unavailability:

{code}
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
- parking to wait for  <0x0006aeb21a00> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
Source)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
 Source)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
 Source)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
 Source)
at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
Source)
at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}

And even after the broker went back to normal, producers would still be 
blocked. One suspicion is that when broker's not able to handle the request in 
time, the responses are dropped somehow inside the Sender, and hence whoever 
waiting on this response would be blocked forever.

We've observed such scenarios when 1) broker's transiently failed for a while, 
2) network partitioned transiently, and 3) broker's bad config like ACL caused 
it to not be able to handle requests for a while.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7243) Add unit integration tests to validate metrics in Kafka Streams

2019-03-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798581#comment-16798581
 ] 

Guozhang Wang commented on KAFKA-7243:
--

[~Khairy] Thanks for your contribution!

> Add unit integration tests to validate metrics in Kafka Streams
> ---
>
> Key: KAFKA-7243
> URL: https://issues.apache.org/jira/browse/KAFKA-7243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: integration-test, newbie++
> Fix For: 2.3.0
>
>
> We should add an integration test for Kafka Streams, that validates:
> 1. After streams application are started, all metrics from different levels 
> (thread, task, processor, store, cache) are correctly created and displaying 
> recorded values.
> 2. When streams application are shutdown, all metrics are correctly 
> de-registered and removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2019-03-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16802245#comment-16802245
 ] 

Guozhang Wang commented on KAFKA-6399:
--

Reviving on this thread. Setting this config to MAX_VALUE does has some 
side-effects, e.g. when an instance is rebooted too quickly such that it has 
not be kicked out of the group by session.timeout, then this group's old member 
id will still be in the group and rebalance.timeout is set to MAX_VALUE which 
means that this rebalance will never complete as the coordinator will shut-off 
heart beating during the prepare-rebalance phase and wait for this old member 
to re-join forever.

So I think we should reduce it from MAX_VALUE for sure, and the question is to 
what default value. Personally I think the value should be biased towards a 
good OOTB (i.e. people do not override this value) experience and hence I'm 
preferring a larger default value like 5min, such that if processing / 
restoring a task spikes up we will not be kicked out of the group, whereas if 
the above scenario happened we are not blocked for more than 5 minutes (note 
that default request timeout is 30 seconds, so the members may be re-join 10 
times, but due to KIP-394 it will not explode coordinator's metadata memory any 
more). 

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803008#comment-16803008
 ] 

Guozhang Wang commented on KAFKA-6399:
--

The ConsumerConfig's default is already set to 5min, so if we agree that it is 
a good value we can just remove the StreamsConfig's override.

Yes it would require a KIP, but should be a very simple one.

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-27 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8159:
-
Description: 
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss the 
negative keys and return those from the range [0, keyTo]. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.

  was:
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss and 
negative keys and return those from the range [0, keyTo]. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.


> Multi-key range queries with negative keyFrom results in unexpected behavior
> 
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, Streams will silently miss the 
> negative keys and return those from the range [0, keyTo]. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803153#comment-16803153
 ] 

Guozhang Wang commented on KAFKA-7190:
--

I agree. I think we shall at least increase `segment.ms` at the moment ant let 
it bound by `segment.bytes` only (with 50MB default value this should still be 
effective in bounding repartition topic sizes). 

As for `segment.index.bytes` looking at the original PR I think it should be 
fixed to `segment.bytes`.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803180#comment-16803180
 ] 

Guozhang Wang commented on KAFKA-7190:
--

Will file a KIP / PR for this discussion, but this ticket itself should be 
resolved only after KIP-360 is done.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-1149) Please delete old releases from mirroring system

2019-03-27 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1149.
--
Resolution: Fixed

> Please delete old releases from mirroring system
> 
>
> Key: KAFKA-1149
> URL: https://issues.apache.org/jira/browse/KAFKA-1149
> Project: Kafka
>  Issue Type: Bug
> Environment: http://www.apache.org/dist/kafka/old_releases/
>Reporter: Sebb
>Assignee: Guozhang Wang
>Priority: Major
>
> To reduce the load on the ASF mirrors, projects are required to delete old 
> releases [1]
> Please can you remove all non-current releases?
> Thanks!
> [Note that older releases are always available from the ASF archive server]
> [1] http://www.apache.org/dev/release.html#when-to-archive



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1149) Please delete old releases from mirroring system

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803328#comment-16803328
 ] 

Guozhang Wang commented on KAFKA-1149:
--

I've removed all old releases from https://www.apache.org/dist/kafka/.

> Please delete old releases from mirroring system
> 
>
> Key: KAFKA-1149
> URL: https://issues.apache.org/jira/browse/KAFKA-1149
> Project: Kafka
>  Issue Type: Bug
> Environment: http://www.apache.org/dist/kafka/old_releases/
>Reporter: Sebb
>Assignee: Guozhang Wang
>Priority: Major
>
> To reduce the load on the ASF mirrors, projects are required to delete old 
> releases [1]
> Please can you remove all non-current releases?
> Thanks!
> [Note that older releases are always available from the ASF archive server]
> [1] http://www.apache.org/dev/release.html#when-to-archive



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8166) Kafka 2.2 Javadoc broken

2019-03-27 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8166.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.2.0

> Kafka 2.2 Javadoc broken
> 
>
> Key: KAFKA-8166
> URL: https://issues.apache.org/jira/browse/KAFKA-8166
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.2.0
>Reporter: Sachin NS
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: documentation
> Fix For: 2.2.0
>
> Attachments: 2.2_javadoc.JPG
>
>
> The Kafka 2.2 javadocs link mentioned in the Apache Kafka API documentation 
> ([http://kafka.apache.org/documentation/#producerapi)] errors out and gives a 
> 404 Not Found error. Below is the link which is mentioned in the 
> documentation:
> [http://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html]
> I don't see javadoc folder within [https://kafka.apache.org/22/]
> This error is not present in Kafka 2.1  or previous documentation.
> Attaching screenshot for reference.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8166) Kafka 2.2 Javadoc broken

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803499#comment-16803499
 ] 

Guozhang Wang commented on KAFKA-8166:
--

I've copied the javadocs of 22. It should be fixed now.

> Kafka 2.2 Javadoc broken
> 
>
> Key: KAFKA-8166
> URL: https://issues.apache.org/jira/browse/KAFKA-8166
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.2.0
>Reporter: Sachin NS
>Priority: Major
>  Labels: documentation
> Attachments: 2.2_javadoc.JPG
>
>
> The Kafka 2.2 javadocs link mentioned in the Apache Kafka API documentation 
> ([http://kafka.apache.org/documentation/#producerapi)] errors out and gives a 
> 404 Not Found error. Below is the link which is mentioned in the 
> documentation:
> [http://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html]
> I don't see javadoc folder within [https://kafka.apache.org/22/]
> This error is not present in Kafka 2.1  or previous documentation.
> Attaching screenshot for reference.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803512#comment-16803512
 ] 

Guozhang Wang commented on KAFKA-8159:
--

Thanks for reporting this. This is indeed an overlooked issue when looking at 
integer serdes.

More generally speaking, today we are sorta assuming that all serdes used are 
aligned with lexicographic ordering of bytes: if a typed object 
A1.compareTo(A2) > 0, then their serialized bytes should have the same 
relationaship lexicographically. However Integer / Long serdes etc definitely 
does not obey this. So I'd suggest upgrading this ticket to this more broader 
scope and use the multi-key range query as a specific observable issue of it.

> Multi-key range queries with negative keyFrom results in unexpected behavior
> 
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16804148#comment-16804148
 ] 

Guozhang Wang commented on KAFKA-7965:
--

[~enether] Saw another failure: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3484

```
kafka.api.ConsumerBounceTest > 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
08:14:24 java.util.concurrent.ExecutionException: Boxed Error
08:14:24 
08:14:24 Caused by:
08:14:24 java.lang.AssertionError: Received 0, expected at least 68
08:14:24 at org.junit.Assert.fail(Assert.java:89)
08:14:24 at org.junit.Assert.assertTrue(Assert.java:42)
08:14:24 at 
kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:563)
08:14:24 at 
kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$5(ConsumerBounceTest.scala:347)
```

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805543#comment-16805543
 ] 

Guozhang Wang commented on KAFKA-7447:
--

Is it related to https://issues.apache.org/jira/browse/KAFKA-8069?

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished 
> loading offse

[jira] [Resolved] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6681.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

Resolving as part of https://issues.apache.org/jira/browse/KAFKA-5154 now.

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Fix For: 0.11.0.0
>
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7142.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Resolving the ticket since the PR is merged as in 2.1.0 already.

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, 
> 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Major
> Fix For: 2.1.0
>
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> Group rebalance is a very frequent operation, it can be triggered by adding / 
> removing / restarting a single member in the consumer group.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$f

[jira] [Commented] (KAFKA-5586) Handle client disconnects during JoinGroup

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805554#comment-16805554
 ] 

Guozhang Wang commented on KAFKA-5586:
--

Hi [~hachikuji], with KIP-394 and KIP-91, that Streams is also going to remove 
its override on `max.poll.interval.ms` to use consumer default (5min) in 
KIP-442, I think this would not be an issue worth resolving now. WDYT?

> Handle client disconnects during JoinGroup
> --
>
> Key: KAFKA-5586
> URL: https://issues.apache.org/jira/browse/KAFKA-5586
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Major
>
> If a consumer disconnects with a JoinGroup in-flight, we do not remove it 
> from the group until after the Join phase completes. If the client 
> immediately re-sends the JoinGroup request and it already had a memberId, 
> then the callback will be replaced and there is no harm done. For the other 
> cases:
> 1. If the client disconnected due to a failure and does not re-send the 
> JoinGroup, the consumer will still be included in the new group generation 
> after the rebalance completes, but will immediately timeout and trigger a new 
> rebalance.
> 2. If the consumer was not a member of the group and re-sends JoinGroup, then 
> a new memberId will be created for that consumer and the old one will not be 
> removed. When the rebalance completes, the old memberId will timeout and a 
> rebalance will be triggered.
> To address these issues, we should add some additional logic to handle client 
> disconnections during the join phase. For newly generated memberIds, we 
> should simply remove them. For existing members, we should probably leave 
> them in the group and reset the heartbeat expiration task.
> Note that we currently have no facility to expose disconnects from the 
> network layer to the other layers, so we need to find a good approach for 
> this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805561#comment-16805561
 ] 

Guozhang Wang commented on KAFKA-6745:
--

I think the root cause is that when you are bouncing a consumer instance, the 
consumer's member.id is not kicked out of the group yet when it was re-started 
and hence re-join as a new member. In this case the old.member will never send 
a re-join group and the coordinator will always have to wait till the 
rebalance.timeout (5 min) has elapsed to kick out the member.

Could you describe how did you rebalance the consumer? Did you gracefully 
shutdown each instance, and then restarted them?

> kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)
> -
>
> Key: KAFKA-6745
> URL: https://issues.apache.org/jira/browse/KAFKA-6745
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.11.0.0
>Reporter: Ramkumar
>Priority: Major
>
> Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts 
> as a REST api for the publishers and consumers to use middleware intead of 
> using kafka client api. Here the when the consumers rebalance is not a major 
> issue.
> We wanted to upgrade to kafka 0.11 , we have updated our http services (3 
> node cluster) to use new Kafka consumer API , but it takes rebalancing of 
> consumer (multiple consumer under same Group) between secs to 5 mins 
> (max.poll.interval.ms). Because of this time our http clients are timing out 
> and do failover. This rebalancing time is major issue. It is not clear from 
> the documentation ,that rebalance activity for the group takes place after 
> max.poll.interval.ms  or it starts after 3 secs and complete any time with in 
> 5 minutes. We tried to reduce max.poll.interval.ms   to 15 seconds. but this 
> also triggers rebalance internally.
> Below are the other parameters we have set In our service
> max.poll.interval.ms = 30 sec
>  seconds heartbeat.interval.ms = 1
> minute session.timeout.ms = 4
> minutes consumer.cache.timeout = 2 min
>  
>  
> below is the log
> ""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> (Re-)joining group firstnetportal_001
> ""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Successfully joined group firstnetportal_001 with generation 7475
> Please let me know if there are any other application/client use http 
> interace in 3 nodes with out any having this  issue
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6631.
--
Resolution: Fixed

Just a side note that we are working on KAFKA-7149 to reduce the assignment 
metadata size with many topic partitions in the assignment.

> Kafka Streams - Rebalancing exception in Kafka 1.0.0
> 
>
> Key: KAFKA-6631
> URL: https://issues.apache.org/jira/browse/KAFKA-6631
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Container Linux by CoreOS 1576.5.0
>Reporter: Alexander Ivanichev
>Priority: Critical
>
>  
> In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
> performs window based aggregations, sometimes on start when all stream 
> workers  join the app just crash, however if we enable only one worker than 
> it works fine, sometime 2 workers work just fine, but when third join the app 
> crashes again, some critical issue with rebalance.
> {code:java}
> 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.226557000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.22686Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
> 2018-03-08T18:51:01.227328000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> 2018-03-08T18:51:01.22763Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> 2018-03-08T18:51:01.228152000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> 2018-03-08T18:51:01.228449000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> 2018-03-08T18:51:01.228897000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> 2018-03-08T18:51:01.229196000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
> 2018-03-08T18:51:01.229673000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
> 2018-03-08T18:51:01.229971000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
> 2018-03-08T18:51:01.230436000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> 2018-03-08T18:51:01.230749000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> 2018-03-08T18:51:01.231065000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> 2018-03-08T18:51:01.231584000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> 2018-03-08T18:51:01.231911000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> 2018-03-08T18:51:01.23219Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> 2018-03-08T18:51:01.232643000Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> 2018-03-08T18:51:01.233121000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> 2018-03-08T18:51:01.233409000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> 2018-03-08T18:51:01.23372Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 2018-03-08T18:51:01.234196000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> 2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.234972000Z exception in thread, closing process
> 2018-03-08T18:51:01.23550Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.235839000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.ja

[jira] [Resolved] (KAFKA-4799) session timeout during event processing shuts down stream

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4799.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.1

> session timeout during event processing shuts down stream
> -
>
> Key: KAFKA-4799
> URL: https://issues.apache.org/jira/browse/KAFKA-4799
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: kafka streams client running on os x, with docker 
> machine running broker
>Reporter: Jacob Gur
>Priority: Critical
> Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
>   private  IConsumerSubscription buildSubscriptionStream(
>   Class clazz, Consumer consumer, String group,
>   Function> 
> topicStreamFunc)
>   {
>   KStreamBuilder builder = new KStreamBuilder();
>   KStream stream = topicStreamFunc.apply(builder);
>   stream.foreach((k, v) -> {
>   try {
>   T value = 
> _jsonObjectMapper.mapFromJsonString(v, clazz);
>   consumer.accept(value);
>   Logger.trace("Consumed message {}", value);
>   } catch (Throwable th) {
>   Logger.warn("Error while consuming message", 
> th);
>   }
>   });
>   final KafkaStreams streams = new KafkaStreams(builder, 
> constructProperties(group));
>   streams.start();
>   return streams::close;
>   }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor 
> (i.e., inside the foreach lambda) with debugger suspending all threads for 
> perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed 
> from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, 
> but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular 
> KafkaConsumer would. Its partition would be revoked, and then it would get it 
> back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is 
> this a bug?
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805565#comment-16805565
 ] 

Guozhang Wang commented on KAFKA-4600:
--

This seems to be the same issue as KAFKA-5154 and has been fixed as in 0.11.0.0.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-29 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4600.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2617) Move protocol field default values to Protocol

2019-03-29 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805568#comment-16805568
 ] 

Guozhang Wang commented on KAFKA-2617:
--

I'm closing this ticket as it is resolved by KAFKA-7609 now.

> Move protocol field default values to Protocol
> --
>
> Key: KAFKA-2617
> URL: https://issues.apache.org/jira/browse/KAFKA-2617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Jakub Nowak
>Priority: Minor
>  Labels: newbie
>
> Right now the default values are scattered in the Request / Response classes, 
> and some duplicates already exists like JoinGroupRequest.UNKNOWN_CONSUMER_ID 
> and OffsetCommitRequest.DEFAULT_CONSUMER_ID. We would like to move all 
> default values into org.apache.kafka.common.protocol.Protocol since 
> org.apache.kafka.common.requests depends on org.apache.kafka.common.protocol 
> anyways.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806234#comment-16806234
 ] 

Guozhang Wang commented on KAFKA-4600:
--

Hello [~dana.powers] The root cause is around when we should set `need-rejoin` 
boolean flag to false. Prior to KAFKA-5154 it was reset after the join-group 
response is received, so if there's an error after that, e.g. during sync-group 
round trip, e.g. in this ticket inside the onAssign callback, then the consumer 
will just continue fetching from the previously assigned partitions, like this 
ticket reportedly observed. In KAFKA-5154 we pushed `resetJoinGroupFuture()` 
after the `onJoinComplete` code, which will cover this case if the error was 
thrown inside the callback the consumer will not proceed to fetch from 
previously assigned partitions.

As for error propagation, right now we already log ERROR as `User provided 
listener {} failed on partition assignment`, and because of the fix of 
KAFKA-5154 it will block consumer from proceeding.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806244#comment-16806244
 ] 

Guozhang Wang commented on KAFKA-4600:
--

NPE inside KafkaStreams is just another observation of the root cause. Maybe 
reading the attached PR (https://github.com/apache/kafka/pull/3181) would 
better illustrate the scenario. Note that for this ticket, the more severe 
observation is that `happily consuming messages from the new partition. When 
the state is relied upon for correct processing, this can be very bad, e.g. 
data loss can occur.`, and we'd want to `very least the assignment should fail 
so the consumer doesn't see any messages from the new partitions, and the 
rebalance can be reattempted.` The PR of KAFKA-5154 did this purpose.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer

2019-04-01 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8179:


 Summary: Incremental Rebalance Protocol for Kafka Consumer
 Key: KAFKA-8179
 URL: https://issues.apache.org/jira/browse/KAFKA-8179
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Recently Kafka community is promoting cooperative rebalancing to mitigate the 
pain points in the stop-the-world rebalancing protocol. This ticket is created 
to initiate that idea at the Kafka consumer client, which will be beneficial 
for heavy-stateful consumers such as Kafka Streams applications.

In short, the scope of this ticket includes reducing unnecessary rebalance 
latency due to heavy partition migration: i.e. partitions being revoked and 
re-assigned. This would make the built-in consumer assignors (range, 
round-robin etc) to be aware of previously assigned partitions and be sticky in 
best-effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16807895#comment-16807895
 ] 

Guozhang Wang commented on KAFKA-4600:
--

Hello Braedon,

Could you confirm if your issue is that 1) when rebalance listener callback 
throws, "the error is logged and the consumer proceeds on as if nothing 
happened", or 2) you just want consumer coordinator to not capture the 
exception at all and always expose it to client callers.

For 1), because of KAFKA-5154, the consumer will NOT proceeds as if nothing 
happened, it will continue try to re-join and during which it will not continue 
consuming messages from the newly assigned partitions. If the exception in the 
rebalance listener is consistently thrown, then no new messages will be 
consumed and consumer will repeatedly log ERROR message. If the exception is 
transient then the second rebalance will succeed in the rebalance listener.

For 2), it is a user-facing interface change since the originally designed 
protocol is that listener callback's exception will not be thrown all the way 
to user's face and crash the consumer client. So if you propose we should 
change this behavior with good reasons, we should discuss it within a KIP.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

2019-04-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809993#comment-16809993
 ] 

Guozhang Wang commented on KAFKA-8135:
--

Hmm that's possible, as ConcurrentModificationException will only be caught at 
the higher-level `run` method:

{code}
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
{code}

But before that exception thrown, the batch expired is already removed as 
{{iter.remove();}}, hence it become a dangling object.

> Kafka Producer deadlocked on flush call with intermittent broker 
> unavailability
> ---
>
> Key: KAFKA-8135
> URL: https://issues.apache.org/jira/browse/KAFKA-8135
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Guozhang Wang
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
> and the value is default to 2 minutes. We've observed that when it was set to 
> MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
> {{broker.flush}} call would be blocked during the time when its destination 
> brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
> - parking to wait for  <0x0006aeb21a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
> Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
> Source)
> at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be 
> blocked. One suspicion is that when broker's not able to handle the request 
> in time, the responses are dropped somehow inside the Sender, and hence 
> whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a 
> while, 2) network partitioned transiently, and 3) broker's bad config like 
> ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

2019-04-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809995#comment-16809995
 ] 

Guozhang Wang commented on KAFKA-8135:
--

I think we can leave this open for a while, if there's no more observations in 
soak testing / user reported issues in 2.1.1 / 2.2.0 we can then close it.

> Kafka Producer deadlocked on flush call with intermittent broker 
> unavailability
> ---
>
> Key: KAFKA-8135
> URL: https://issues.apache.org/jira/browse/KAFKA-8135
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Guozhang Wang
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
> and the value is default to 2 minutes. We've observed that when it was set to 
> MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
> {{broker.flush}} call would be blocked during the time when its destination 
> brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
> - parking to wait for  <0x0006aeb21a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
> Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
> Source)
> at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be 
> blocked. One suspicion is that when broker's not able to handle the request 
> in time, the responses are dropped somehow inside the Sender, and hence 
> whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a 
> while, 2) network partitioned transiently, and 3) broker's bad config like 
> ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810045#comment-16810045
 ] 

Guozhang Wang commented on KAFKA-4600:
--

Sure.

The issue of KAFKA-4600 is the following sequence (I did not include the line 
number since in trunk it has been changed compared with the original PR 
https://github.com/apache/kafka/pull/3181, but you can search for the function 
name to get it):

1. First round-trip completed, "rejoinNeeded = false" called on 
{{JoinGroupResponseHandler}}.
2. Second round-trip complete, {{SyncGroupResponseHandler}} calls 
{{onJoinComplete}}, which first set the new assignment, and then calls the 
{{RebalanceListener#onPartitionsAssigned}}, which throws an exception and then 
got swallowed as an error message.
3. Consumer will continue fetching from the newly assigned partitions, no 
re-join group will be issued since in step 1) rejoinNeeded is already set to 
false.

The fix of KAFKA-5154 is that we delay "rejoinNeeded = false" to the end of 
{{SyncGroupResponseHandler}}, not {{JoinGroupResponseHandler}}. So in the above 
sequence, at step 3:

3. ConsumerCoordinator see rejoinNeeded is true, and hence revoke its 
assignment immediately and then send JoinGroupRequest again, no data from those 
partitions will be fetched.

Note as I mentioned above, if the error thrown from the 
{{onPartitionsAssigned}} is just transient, then a later rebalance will succeed 
(hopefully), if it is consistent due to bugs, then consumer will falls into 
this endless loop of rejoining group and failing the callback, but no data will 
be fetched.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-04 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4600:
-
Comment: was deleted

(was: Sure.

The issue of KAFKA-4600 is the following sequence (I did not include the line 
number since in trunk it has been changed compared with the original PR 
https://github.com/apache/kafka/pull/3181, but you can search for the function 
name to get it):

1. First round-trip completed, "rejoinNeeded = false" called on 
{{JoinGroupResponseHandler}}.
2. Second round-trip complete, {{SyncGroupResponseHandler}} calls 
{{onJoinComplete}}, which first set the new assignment, and then calls the 
{{RebalanceListener#onPartitionsAssigned}}, which throws an exception and then 
got swallowed as an error message.
3. Consumer will continue fetching from the newly assigned partitions, no 
re-join group will be issued since in step 1) rejoinNeeded is already set to 
false.

The fix of KAFKA-5154 is that we delay "rejoinNeeded = false" to the end of 
{{SyncGroupResponseHandler}}, not {{JoinGroupResponseHandler}}. So in the above 
sequence, at step 3:

3. ConsumerCoordinator see rejoinNeeded is true, and hence revoke its 
assignment immediately and then send JoinGroupRequest again, no data from those 
partitions will be fetched.

Note as I mentioned above, if the error thrown from the 
{{onPartitionsAssigned}} is just transient, then a later rebalance will succeed 
(hopefully), if it is consistent due to bugs, then consumer will falls into 
this endless loop of rejoining group and failing the callback, but no data will 
be fetched.)

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810166#comment-16810166
 ] 

Guozhang Wang commented on KAFKA-4600:
--

[~braedon] [~dana.powers] I think I may understand the issue better now, while 
writing down the fixes of KAFKA-5154 and comparing it with the reported issue 
here. My understanding is that, in this ticket, we want to NOT swallow the 
exception thrown from the callback but let Consumer to handle it accordingly 
(or re-throw it to users). Here's a few options we can consider (any of these 
changes can be discussed as part of a KIP since it changes the consumer's 
semantics to users anyways):

0. As today, consumer swallows the error and log it and proceed as like the 
callback completes (i.e. the partitions are still revoked / assigned within 
this rebalance generation).
1. We never capture the exception thrown from the callback, instead it was 
treated as a fatal error and hence thrown all the way to the caller. The 
consumer instance will kill itself as well.
2. We let consumer to handle it, especially under KIP-429 when we no longer 
revoke everything before joining the group: e.g. suppose your current assigned 
partitions are {{1,2}}, and the newly assigned partitions are {{2, 3}}, the 
consumer will call onPartitionsAssigned(3) and onPartitionsRevoked(1). Suppose 
the former succeeds but the latter failed with an error, we just let the 
consumer to proceed as assign with {{1, 2, 3}} since the {{3}} is added 
successfully but {{1}} is revoked unsuccessfully.

Personally I would avoid consumer specific handling like 2) above, since it has 
to assume what does "throwing an exception" mean from the callback: does it 
mean none of the consumer's state was cleaned / initiated, or partly cleaned / 
initiated? Comparing with 0 and 1 I'm fine with 1), i.e. just always treat it 
as a fatal error and kill the consumer. If [~braedon] likes this idea, please 
feel free to propose it as a new KIP changing the default behavior of the 
consumer.


> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8194) MessagesInPerSec incorrect value when Stream produce messages

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810977#comment-16810977
 ] 

Guozhang Wang commented on KAFKA-8194:
--

[~odyldz...@gmail.com] do you know why the messages produced by Streams can be 
incremented by more than 1? Streams still use an embedded producer client for 
producing, and hence its offsets are all relative ones, and should be 
monotonically increasing.

> MessagesInPerSec incorrect value when Stream produce messages
> -
>
> Key: KAFKA-8194
> URL: https://issues.apache.org/jira/browse/KAFKA-8194
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Odyldzhon Toshbekov
>Priority: Trivial
> Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot 
> 2019-04-05 at 17.52.22.png
>
>
> Looks like metric
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code}
> has incorrect value when messages come via Kafka Stream API.
> I noticed that offset for every message from Kafka Stream can be increased by 
> 1,2,... However if messages come to Broker from Kafka producer it's always 
> incremented by 1.
> Unfortunately the metric mentioned above calculated based on offset changes 
> and as result we cannot use streams because metric will be always incorrect.
> For Kafka 2.2.0
> !Screen Shot 2019-04-05 at 17.51.03.png|width=100%!
>  
> [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala]
> And this is the method used to get "numAppendedMessages"
>  !Screen Shot 2019-04-05 at 17.52.22.png|width=100%!
> https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8157) Missing "key.serializer" exception when setting "segment index bytes"

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810992#comment-16810992
 ] 

Guozhang Wang commented on KAFKA-8157:
--

I can confirm that this fix is piggy-backed in the above commit as [~ouertani] 
pointed out. However I cannot cherry-pick that exact commit into older branches 
as it contains other changes that are public interface changes.

I will try to file a new PR for this fix only for older branches.

> Missing "key.serializer" exception when setting "segment index bytes"
> -
>
> Key: KAFKA-8157
> URL: https://issues.apache.org/jira/browse/KAFKA-8157
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: ubuntu 18.10, localhost and Aiven too
>Reporter: Cristian D
>Priority: Major
>  Labels: beginner, newbie
>
> As a `kafka-streams` user,
> When I set the "segment index bytes" property
> Then I would like to have internal topics with the specified allocated disk 
> space
>  
> At the moment, when setting the "topic.segment.index.bytes" property, the 
> application is exiting with following exception: 
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
> {code}
> Tested with `kafka-streams` v2.0.0 and v2.2.0.
>  
> Stack trace:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:392)
>  at 
> org.apache.kafka.streams.StreamsConfig.getMainConsumerConfigs(StreamsConfig.java:1014)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:666)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:718)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:634)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:544)
>  at app.Main.main(Main.java:36)
> {code}
> A demo application simulating the exception:
> https://github.com/razorcd/java-snippets-and-demo-projects/tree/master/kafkastreamsdemo
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811023#comment-16811023
 ] 

Guozhang Wang commented on KAFKA-8187:
--

[~wgreerx] Thanks for reporting the bug! It was very well written and crystal 
about the root cause, thanks again.

I've read through the code and can confirm that it is indeed a bug, and 
actually is still exist in trunk (hence affecting not only 2.0.1 but later 
releases as well).

Are you interested in filing a PR to fix it? I think the proposed option 1) 
would be preferable: we should not let the thread to transit to RUNNING until 
all tasks, active and standby are ready. And that additional wait time, in 
practice, should not be very long.

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before t

[jira] [Commented] (KAFKA-8003) Flaky Test TransactionsTest #testFencingOnTransactionExpiration

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811203#comment-16811203
 ] 

Guozhang Wang commented on KAFKA-8003:
--

Happened on 2.2 again:

{code}
10:00:48 kafka.api.TransactionsTest > testFencingOnTransactionExpiration FAILED
10:00:48 java.lang.AssertionError: expected:<1> but was:<0>
10:00:48 at org.junit.Assert.fail(Assert.java:88)
10:00:48 at org.junit.Assert.failNotEquals(Assert.java:834)
10:00:48 at org.junit.Assert.assertEquals(Assert.java:645)
10:00:48 at org.junit.Assert.assertEquals(Assert.java:631)
10:00:48 at 
kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:486)
{code}

> Flaky Test TransactionsTest #testFencingOnTransactionExpiration
> ---
>
> Key: KAFKA-8003
> URL: https://issues.apache.org/jira/browse/KAFKA-8003
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/34/]
> {quote}java.lang.AssertionError: expected:<1> but was:<0> at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:645) at 
> org.junit.Assert.assertEquals(Assert.java:631) at 
> kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:510){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6824) Flaky Test DynamicBrokerReconfigurationTest#testAddRemoveSslListener

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811205#comment-16811205
 ] 

Guozhang Wang commented on KAFKA-6824:
--

Happened again:

{code}
10:41:20 kafka.server.DynamicBrokerReconfigurationTest > 
testAdvertisedListenerUpdate FAILED
10:41:20 java.util.concurrent.TimeoutException: Timeout after waiting for 
1 ms.
10:41:20 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
10:41:20 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
10:41:20 at 
kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:950)
10:41:20 at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
10:41:20 at scala.collection.Iterator.foreach(Iterator.scala:929)
10:41:20 at scala.collection.Iterator.foreach$(Iterator.scala:929)
10:41:20 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
10:41:20 at scala.collection.IterableLike.foreach(IterableLike.scala:71)
10:41:20 at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)
10:41:20 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
10:41:20 at 
scala.collection.TraversableLike.map(TraversableLike.scala:234)
10:41:20 at 
scala.collection.TraversableLike.map$(TraversableLike.scala:227)
10:41:20 at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
10:41:20 at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:950)
10:41:20 at 
kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:636)
{code}

> Flaky Test DynamicBrokerReconfigurationTest#testAddRemoveSslListener
> 
>
> Key: KAFKA-6824
> URL: https://issues.apache.org/jira/browse/KAFKA-6824
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Anna Povzner
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> Observed two failures of this test (both in PR builds) :(
>  
> *Failure #1: (JDK 7 and Scala 2.11 )*
> *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>
> *17:20:49*         at org.junit.Assert.fail(Assert.java:88)
> *17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
>  
> *Failure #2: (JDK 8)*
> *18:46:23* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *18:46:23*     java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
> *18:46:23*         at 
> kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:953)
> *18:46:23*         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
> *18:46:23*         at scala.collection.Iterator.foreach(Iterator.scala:929)
> *18:46:23*         at scala.collection.Iterator.foreach$(Iterator.scala:929)
> *18:46:23*         at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
> *18:46:23*         at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)
> *18:46:23*         at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)
> *18:46:23*         at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> *18:46:23*         at 
> scala.collection.TraversableLike.map(Trav

[jira] [Created] (KAFKA-8197) Flaky Test kafka.server.DynamicBrokerConfigTest > testPasswordConfigEncoderSecretChange

2019-04-05 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8197:


 Summary: Flaky Test kafka.server.DynamicBrokerConfigTest > 
testPasswordConfigEncoderSecretChange
 Key: KAFKA-8197
 URL: https://issues.apache.org/jira/browse/KAFKA-8197
 Project: Kafka
  Issue Type: Improvement
  Components: core, unit tests
Affects Versions: 1.1.1
Reporter: Guozhang Wang


{code}
09:18:23 kafka.server.DynamicBrokerConfigTest > 
testPasswordConfigEncoderSecretChange FAILED
09:18:23 org.junit.ComparisonFailure: expected:<[staticLoginModule 
required;]> but was:<[????O?i???A?c'??Ch?|?p]>
09:18:23 at org.junit.Assert.assertEquals(Assert.java:115)
09:18:23 at org.junit.Assert.assertEquals(Assert.java:144)
09:18:23 at 
kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:253)
{code}

https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/13466/consoleFull



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8194) MessagesInPerSec incorrect value when Stream produce messages

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811215#comment-16811215
 ] 

Guozhang Wang commented on KAFKA-8194:
--

AFAIK only when transactional messaging are enabled or log compaction is 
enabled then the records may have no incremental offsets: note for the former 
case, it is not actually producer skipped some offsets, but just that consumers 
will actually filter those internal message like txn markers and hence 
illustrate "holes" of the record.

With that, I think the behavior of numMessages above are still valid, because 
there are indeed some records appended in between, just that these messages are 
used for brokers themselves and not user's produced records, and hence not 
returned to consumers as well. If we want the metrics to explicitly exclude 
those messages --- I can understand that from monitoring pov it may be 
confusing to users --- then I think we should file a KIP to discuss this public 
behavior change.

> MessagesInPerSec incorrect value when Stream produce messages
> -
>
> Key: KAFKA-8194
> URL: https://issues.apache.org/jira/browse/KAFKA-8194
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Odyldzhon Toshbekov
>Priority: Trivial
> Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot 
> 2019-04-05 at 17.52.22.png
>
>
> Looks like metric
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code}
> has incorrect value when messages come via Kafka Stream API.
> I noticed that offset for every message from Kafka Stream can be increased by 
> 1,2,... However if messages come to Broker from Kafka producer it's always 
> incremented by 1.
> Unfortunately the metric mentioned above calculated based on offset changes 
> and as result we cannot use streams because metric will be always incorrect.
> For Kafka 2.2.0
> !Screen Shot 2019-04-05 at 17.51.03.png|width=100%!
>  
> [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala]
> And this is the method used to get "numAppendedMessages"
>  !Screen Shot 2019-04-05 at 17.52.22.png|width=100%!
> https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8032) Flaky Test UserQuotaTest#testQuotaOverrideDelete

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811233#comment-16811233
 ] 

Guozhang Wang commented on KAFKA-8032:
--

One more: {{java.lang.AssertionError: Client with id=QuotasTestProducer-1 
should have been throttled}} 

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3599/consoleFull

> Flaky Test UserQuotaTest#testQuotaOverrideDelete
> 
>
> Key: KAFKA-8032
> URL: https://issues.apache.org/jira/browse/KAFKA-8032
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2830/testReport/junit/kafka.api/UserQuotaTest/testQuotaOverrideDelete/]
> {quote}java.lang.AssertionError: Client with id=QuotasTestProducer-1 should 
> have been throttled at org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.assertTrue(Assert.java:42) at 
> kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229) 
> at kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215) 
> at 
> kafka.api.BaseQuotaTest.testQuotaOverrideDelete(BaseQuotaTest.scala:124){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811237#comment-16811237
 ] 

Guozhang Wang commented on KAFKA-8153:
--

Hey [~mmelsen] what is `CleanupConfig` you mentioned in the ticket? That seems 
not part of apache kafka functionalities? 

> Streaming application with state stores takes up to 1 hour to restart
> -
>
> Key: KAFKA-8153
> URL: https://issues.apache.org/jira/browse/KAFKA-8153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Michael Melsen
>Priority: Major
>
> We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
> InteractiveQueryService to fetch data from the stores. There are 4 stores 
> that persist data on disk after aggregating data. The code for the topology 
> looks like this:
> {code:java}
> @Slf4j
> @EnableBinding(SensorMeasurementBinding.class)
> public class Consumer {
>   public static final String RETENTION_MS = "retention.ms";
>   public static final String CLEANUP_POLICY = "cleanup.policy";
>   @Value("${windowstore.retention.ms}")
>   private String retention;
> /**
>  * Process the data flowing in from a Kafka topic. Aggregate the data to:
>  * - 2 minute
>  * - 15 minutes
>  * - one hour
>  * - 12 hours
>  *
>  * @param stream
>  */
> @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
> public void process(KStream stream) {
> Map topicConfig = new HashMap<>();
> topicConfig.put(RETENTION_MS, retention);
> topicConfig.put(CLEANUP_POLICY, "delete");
> log.info("Changelog and local window store retention.ms: {} and 
> cleanup.policy: {}",
> topicConfig.get(RETENTION_MS),
> topicConfig.get(CLEANUP_POLICY));
> createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
> createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
>   }
>   private void createWindowStore(
> LocalStore localStore,
> Map topicConfig,
> KStream stream) {
> // Configure how the statestore should be materialized using the provide 
> storeName
> Materialized> materialized 
> = Materialized
> .as(localStore.getStoreName());
> // Set retention of changelog topic
> materialized.withLoggingEnabled(topicConfig);
> // Configure how windows looks like and how long data will be retained in 
> local stores
> TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
> localStore.getTimeUnit(), 
> Long.parseLong(topicConfig.get(RETENTION_MS)));
> // Processing description:
> // The input data are 'samples' with key 
> :::
> // 1. With the map we add the Tag to the key and we extract the error 
> score from the data
> // 2. With the groupByKey we group  the data on the new key
> // 3. With windowedBy we split up the data in time intervals depending on 
> the provided LocalStore enum
> // 4. With reduce we determine the maximum value in the time window
> // 5. Materialized will make it stored in a table
> stream
> .map(getInstallationAssetModelAlgorithmTagKeyMapper())
> .groupByKey()
> .windowedBy(configuredTimeWindows)
> .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
> newValue), materialized);
>   }
>   private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
> retentionMs) {
> TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
> timeWindows.until(retentionMs);
> return timeWindows;
>   }
>   /**
>* Determine the max error score to keep by looking at the aggregated error 
> signal and
>* freshly consumed error signal
>*
>* @param aggValue
>* @param newValue
>* @return
>*/
>   private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore 
> newValue) {
> if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
> return aggValue;
> }
> return newValue;
>   }
>   private KeyValueMapper KeyValue> 
> getInstallationAssetModelAlgorithmTagKeyMapper() {
> return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + 
> sensorMeasurement.getT(),
> new ErrorScore(sensorMeasurement.getTs(), 
> sensorMeasurement.getE(), sensorMeasurement.getO()));
>   }
> }
> {code}
> So we are materializing aggregated data to four different stores after 
> determining the max value within a specific window for a specific key. Please 
> note that retention which is set to two months of data and the clean up 
> policy delete. We don't compact data.
> The size of the individual state stores on

[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811241#comment-16811241
 ] 

Guozhang Wang commented on KAFKA-7965:
--

Another failure:

{code}

12:19:11 kafka.api.ConsumerBounceTest > 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
12:19:11 org.apache.kafka.common.errors.GroupMaxSizeReachedException: 
Consumer group group2 already has the configured maximum number of members.
{code}

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811242#comment-16811242
 ] 

Guozhang Wang commented on KAFKA-7965:
--

[~huxi_2b] Please feel free to submit a PR.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811243#comment-16811243
 ] 

Guozhang Wang commented on KAFKA-7965:
--

https://github.com/apache/kafka/pull/6238 is merged but still see this error on 
2.2 branch.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: huxihx
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-05 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-7965:


Assignee: huxihx  (was: Stanislav Kozlovski)

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: huxihx
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8198:
-
Labels: documentation newbie  (was: documentation)

> KStreams testing docs use non-existent method "pipe"
> 
>
> Key: KAFKA-8198
> URL: https://issues.apache.org/jira/browse/KAFKA-8198
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: documentation, newbie
>
> In [the testing docs for 
> KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
>  we use the following code snippet:
> {code:java}
> ConsumerRecordFactory factory = new 
> ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
> IntegerSerializer());
> testDriver.pipe(factory.create("key", 42L));
> {code}
> As of Apache Kafka 2.2.0, this method no longer exists. We should correct the 
> docs to use the pipeInput method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8198:
-
Affects Version/s: 1.1.1
   2.0.1
   2.2.0
   2.1.1

> KStreams testing docs use non-existent method "pipe"
> 
>
> Key: KAFKA-8198
> URL: https://issues.apache.org/jira/browse/KAFKA-8198
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: documentation
>
> In [the testing docs for 
> KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
>  we use the following code snippet:
> {code:java}
> ConsumerRecordFactory factory = new 
> ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
> IntegerSerializer());
> testDriver.pipe(factory.create("key", 42L));
> {code}
> As of Apache Kafka 2.2.0, this method no longer exists. We should correct the 
> docs to use the pipeInput method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-07 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812068#comment-16812068
 ] 

Guozhang Wang commented on KAFKA-8198:
--

I can confirm that ever since 1.1 when we add the KIP, the API was `pipeInput` 
already, so this doc fix should go all the way back to 1.1 branch.

> KStreams testing docs use non-existent method "pipe"
> 
>
> Key: KAFKA-8198
> URL: https://issues.apache.org/jira/browse/KAFKA-8198
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: documentation, newbie
>
> In [the testing docs for 
> KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
>  we use the following code snippet:
> {code:java}
> ConsumerRecordFactory factory = new 
> ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
> IntegerSerializer());
> testDriver.pipe(factory.create("key", 42L));
> {code}
> As of Apache Kafka 2.2.0, this method no longer exists. We should correct the 
> docs to use the pipeInput method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8194) MessagesInPerSec incorrect value when Stream produce messages

2019-04-07 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812070#comment-16812070
 ] 

Guozhang Wang commented on KAFKA-8194:
--

[~odyldz...@gmail.com] Would you like to initiate a KIP to encourage the 
community for this API change then? 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> MessagesInPerSec incorrect value when Stream produce messages
> -
>
> Key: KAFKA-8194
> URL: https://issues.apache.org/jira/browse/KAFKA-8194
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Odyldzhon Toshbekov
>Priority: Trivial
> Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot 
> 2019-04-05 at 17.52.22.png
>
>
> Looks like metric
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code}
> has incorrect value when messages come via Kafka Stream API.
> I noticed that offset for every message from Kafka Stream can be increased by 
> 1,2,... However if messages come to Broker from Kafka producer it's always 
> incremented by 1.
> Unfortunately the metric mentioned above calculated based on offset changes 
> and as result we cannot use streams because metric will be always incorrect.
> For Kafka 2.2.0
> !Screen Shot 2019-04-05 at 17.51.03.png|width=100%!
>  
> [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala]
> And this is the method used to get "numAppendedMessages"
>  !Screen Shot 2019-04-05 at 17.52.22.png|width=100%!
> https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8157) Missing "key.serializer" exception when setting "segment index bytes"

2019-04-08 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8157.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.2.1
   2.3.0

With the PR  merged and cherry-picked to older branches as well, I'm resolving 
this ticket now.

> Missing "key.serializer" exception when setting "segment index bytes"
> -
>
> Key: KAFKA-8157
> URL: https://issues.apache.org/jira/browse/KAFKA-8157
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: ubuntu 18.10, localhost and Aiven too
>Reporter: Cristian D
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: beginner, newbie
> Fix For: 2.3.0, 2.2.1
>
>
> As a `kafka-streams` user,
> When I set the "segment index bytes" property
> Then I would like to have internal topics with the specified allocated disk 
> space
>  
> At the moment, when setting the "topic.segment.index.bytes" property, the 
> application is exiting with following exception: 
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
> {code}
> Tested with `kafka-streams` v2.0.0 and v2.2.0.
>  
> Stack trace:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:392)
>  at 
> org.apache.kafka.streams.StreamsConfig.getMainConsumerConfigs(StreamsConfig.java:1014)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:666)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:718)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:634)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:544)
>  at app.Main.main(Main.java:36)
> {code}
> A demo application simulating the exception:
> https://github.com/razorcd/java-snippets-and-demo-projects/tree/master/kafkastreamsdemo
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812597#comment-16812597
 ] 

Guozhang Wang commented on KAFKA-6474:
--

Hi [~h314to] could you provide an update on the remaining of PRs you are 
planning on to remove the deprecated KStreamTestDriver?

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8194) MessagesInPerSec incorrect value when Stream produce messages

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812629#comment-16812629
 ] 

Guozhang Wang commented on KAFKA-8194:
--

[~odyldz...@gmail.com] I've added you to the confluence wiki space. Thanks!

> MessagesInPerSec incorrect value when Stream produce messages
> -
>
> Key: KAFKA-8194
> URL: https://issues.apache.org/jira/browse/KAFKA-8194
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Odyldzhon Toshbekov
>Priority: Trivial
> Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot 
> 2019-04-05 at 17.52.22.png
>
>
> Looks like metric
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code}
> has incorrect value when messages come via Kafka Stream API.
> I noticed that offset for every message from Kafka Stream can be increased by 
> 1,2,... However if messages come to Broker from Kafka producer it's always 
> incremented by 1.
> Unfortunately the metric mentioned above calculated based on offset changes 
> and as result we cannot use streams because metric will be always incorrect.
> For Kafka 2.2.0
> !Screen Shot 2019-04-05 at 17.51.03.png|width=100%!
>  
> [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala]
> And this is the method used to get "numAppendedMessages"
>  !Screen Shot 2019-04-05 at 17.52.22.png|width=100%!
> https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5784) Add a sensor for dropped records in window stores

2019-04-08 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5784.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Add a sensor for dropped records in window stores
> -
>
> Key: KAFKA-5784
> URL: https://issues.apache.org/jira/browse/KAFKA-5784
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: siva santhalingam
>Priority: Major
>  Labels: newbie
> Fix For: 2.1.0
>
>
> Today when a {{put(record)}} call on a windowed store does not find the 
> corresponding segment, i.e. its corresponding's window has expired, we simply 
> returns a {{null}} and hence silently drops it.
> We should consider 1) add log4j entries when it happens, 2) add metrics (we 
> can discuss whether it should be a processor-node level, or store level 
> sensor) for such cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5784) Add a sensor for dropped records in window stores

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812664#comment-16812664
 ] 

Guozhang Wang commented on KAFKA-5784:
--

I think this is already added as part of [~vvcephei]'s PR on the suppression 
operator.

> Add a sensor for dropped records in window stores
> -
>
> Key: KAFKA-5784
> URL: https://issues.apache.org/jira/browse/KAFKA-5784
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: siva santhalingam
>Priority: Major
>  Labels: newbie
>
> Today when a {{put(record)}} call on a windowed store does not find the 
> corresponding segment, i.e. its corresponding's window has expired, we simply 
> returns a {{null}} and hence silently drops it.
> We should consider 1) add log4j entries when it happens, 2) add metrics (we 
> can discuss whether it should be a processor-node level, or store level 
> sensor) for such cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8108) Flaky Test kafka.api.ClientIdQuotaTest.testThrottledProducerConsumer

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812847#comment-16812847
 ] 

Guozhang Wang commented on KAFKA-8108:
--

Happened again: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3618/testReport/junit/kafka.api/UserQuotaTest/testThrottledProducerConsumer/

{code}
java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have been 
throttled
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.assertTrue(Assert.java:42)
at 
kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:230)
at 
kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:216)
at 
kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:83){code}

> Flaky Test kafka.api.ClientIdQuotaTest.testThrottledProducerConsumer
> 
>
> Key: KAFKA-8108
> URL: https://issues.apache.org/jira/browse/KAFKA-8108
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> java.lang.AssertionError: Client with id=QuotasTestProducer-!@#$%^&*() should 
> have been throttled
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
>   at 
> kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
>   at 
> kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3230/testReport/junit/kafka.api/ClientIdQuotaTest/testThrottledProducerConsumer/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812848#comment-16812848
 ] 

Guozhang Wang commented on KAFKA-4600:
--

[~braedon] I think your argument makes sense. But we still need to define, that 
when such exception gets thrown to user's face from `consumer.poll` call, and 
users decide to continue and retry instead of shutting down the consumer, what 
should happen -- back to my example above, as in option 2), how to treat 
partitions that failed in revocation / assignment.

So how about this: let me piggy-back this proposal along with KIP-429 since it 
will change the behavior of revocation and assignment orderings anyways. After 
I'm done adding a section regarding the error handling I'll ping you on this 
ticket and we can continue our discussion to fix it in the next release.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-08 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5998:
-
Priority: Critical  (was: Major)

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.j

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-08 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813009#comment-16813009
 ] 

Guozhang Wang commented on KAFKA-5998:
--

This seems affecting lots of people's file systems.

Could everyone share your environment's OS / FS by editing this ticket's 
{{Environment}} field (you can click "edit" above and search for that field), 
while we can look deeper into it and try to find an ultimate fix? [~mparthas] 
[~yaojingguo] [~j-white] [~dminkovsky] [~lbdai3190]

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependenci

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-12 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16816770#comment-16816770
 ] 

Guozhang Wang commented on KAFKA-5998:
--

Thank you guys! I will run over those info and get back to you soon.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.Strea

[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-04-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819547#comment-16819547
 ] 

Guozhang Wang commented on KAFKA-4600:
--

[~braedon] Please check out this section of the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-RebalanceCallbackErrorHandling

and share your thoughts. Note that main thing is that if user decides to retry 
upon capturing the exception, the callback procedure is considered as "none 
side-effect taken".

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-04-18 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7652.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

It should be fixed by the latest PR (details about the performance benchmarks 
can be found inside the PR).

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-22 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8204.
--
   Resolution: Fixed
Fix Version/s: (was: 2.1.2)
   (was: 2.0.2)
   (was: 1.1.2)
   2.3.0

The latest PR has been merged to 2.2 / trunk, we will add more to fixed 
versions as we cherry-pick it to older branches via different PRs.

> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.2.1
>
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16824716#comment-16824716
 ] 

Guozhang Wang commented on KAFKA-8236:
--

In order to understand the scope of this proposed versioning, we should 
consider firstly what the versioning would be used. A couple of questions in my 
mind:

1) Consider an app upgraded from version A to version B while there are still 
some un-consumed data in the intermediate topics, how should the application 
handle it? Or in other words, should we consider to shut down an app at a 
consistent snapshot with no un-consumed intermediate data first of all?

2) If an app's version A and version B has different repartition topics or 
state stores, how should the application handle it? Or in other words, how 
should we determine if new versions are compatible with old versions and hence 
can be restarted smoothly?

3) If an app's version does not change, but the scalability unit changes? For 
example, input topic num.partitions changes.

> Incorporate version control for Kafka Streams Application Reset
> ---
>
> Key: KAFKA-8236
> URL: https://issues.apache.org/jira/browse/KAFKA-8236
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: needs-kip
>
> Inspired by Spark mlflow which supports versioning log, we should be 
> considering expose a special versioning tag for KStream applications to easy 
> rollback bad code deploy. The naive approach is to store the versioning info 
> in consumer offset topic so that when we perform rollback, we know where to 
> read from the input, and where to cleanup the changelog topic. Essentially, 
> this is an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8291) System test consumer_test.py failed on trunk

2019-04-25 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8291.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> System test consumer_test.py failed on trunk
> 
>
> Key: KAFKA-8291
> URL: https://issues.apache.org/jira/browse/KAFKA-8291
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>
> Looks like trunk is failing as for now 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2537/]
> Potentially due to this PR: 
> [https://github.com/apache/kafka/commit/409fabc5610443f36574bdea2e2994b6c20e2829]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes

2019-04-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8254.
--
   Resolution: Fixed
Fix Version/s: (was: 2.1.2)

Resolved for trunk / 2.2 now, will continue to fix 2.1 soon.

> Suppress incorrectly passes a null topic to the serdes
> --
>
> Key: KAFKA-8254
> URL: https://issues.apache.org/jira/browse/KAFKA-8254
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.3.0, 2.2.1
>
>
> For example, in KTableSuppressProcessor, we do:
> {noformat}
> final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, 
> key));
> {noformat}
> This violates the contract of Serializer (and Deserializer), and breaks 
> integration with known Serde implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8029) Add in-memory bytes-only session store implementation

2019-04-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8029.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Add in-memory bytes-only session store implementation
> -
>
> Key: KAFKA-8029
> URL: https://issues.apache.org/jira/browse/KAFKA-8029
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.3.0
>
>
> As titled. We've added the window store and session store implementations in 
> memory, what's left is the session store now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8298) ConcurrentModificationException Possible when optimizing for repartition nodes

2019-04-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8298:
-
Description: 
As indicated in the title.

When processing multiple key-changing operations during the optimization phase 
a ConcurrentModificationException is possible. 

  was:As indicated in the title


> ConcurrentModificationException Possible when optimizing for repartition nodes
> --
>
> Key: KAFKA-8298
> URL: https://issues.apache.org/jira/browse/KAFKA-8298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> As indicated in the title.
> When processing multiple key-changing operations during the optimization 
> phase a ConcurrentModificationException is possible. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes

2019-04-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8254:
-
Fix Version/s: 2.1.2

> Suppress incorrectly passes a null topic to the serdes
> --
>
> Key: KAFKA-8254
> URL: https://issues.apache.org/jira/browse/KAFKA-8254
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> For example, in KTableSuppressProcessor, we do:
> {noformat}
> final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, 
> key));
> {noformat}
> This violates the contract of Serializer (and Deserializer), and breaks 
> integration with known Serde implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2019-04-30 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8307:
-
Labels: user-experience  (was: )

> Kafka Streams should provide some mechanism to determine topology equality 
> and compatibility
> 
>
> Key: KAFKA-8307
> URL: https://issues.apache.org/jira/browse/KAFKA-8307
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: user-experience
>
> Currently, Streams provides no mechanism to compare two topologies. This is a 
> common operation when users want to have tests verifying that they don't 
> accidentally alter their topology. They would save the known-good topology 
> and then add a unit test verifying the current code against that known-good 
> state.
> However, because there's no way to do this comparison properly, everyone is 
> reduced to using the string format of the topology (from 
> `Topology#describe().toString()`). The major drawback is that the string 
> format is meant for human consumption. It is neither machine-parseable nor 
> stable. So, these compatibility tests are doomed to fail when any minor, 
> non-breaking, change is made either to the application, or to the library. 
> This trains everyone to update the test whenever it fails, undermining its 
> utility.
> We should fix this problem, and provide both a mechanism to serialize the 
> topology and to compare two topologies for compatibility. All in all, I think 
> we need:
> # a way to serialize/deserialize topology structure in a machine-parseable 
> format that is future-compatible. Offhand, I'd recommend serializing the 
> topology structure as JSON, and establishing a policy that attributes should 
> only be added to the object graph, never removed. Note, it's out of scope to 
> be able to actually run a deserialized topology; we only want to save and 
> load the structure (not the logic) to facilitate comparisons.
> # a method to verify the *equality* of two topologies... This method tells 
> you that the two topologies are structurally identical. We can't know if the 
> logic of any operator has changed, only if the structure of the graph is 
> changed. We can consider whether other graph properties, like serdes, should 
> be included.
> # a method to verify the *compatibility* of two topologies... This method 
> tells you that moving from topology A to topology B does not require an 
> application reset. Note that this operation is not commutative: 
> `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss 
> whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I 
> think not necessarily, because we may want "equality" to be stricter than 
> "compatibility").



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8285) Handle thread-id random switch on JVM for KStream

2019-05-02 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8285.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Handle thread-id random switch on JVM for KStream
> -
>
> Key: KAFKA-8285
> URL: https://issues.apache.org/jira/browse/KAFKA-8285
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently we are potentially at risk by being bite for interleaving stream 
> thread ids. It is because we share the same init config number when two 
> stream instances happen to be scheduled under one JVM. This would be bad 
> scenario because we could have different thread-ids throughout restarts, 
> which invalidates static membership.
> For example for once our thread id assigned were 1,2,3,4 for instance A and 
> 5, 6, 7, 8 for instance B. On the restart of both instances, the same atomic 
> update could be applied as 1,3,5,7 for A and 2,4,6,8 for B, which changes 
> their group.instance.ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832117#comment-16832117
 ] 

Guozhang Wang commented on KAFKA-8317:
--

Seems https://issues.apache.org/jira/browse/KAFKA-8199 reports the same issue. 
Are they blocking https://issues.apache.org/jira/browse/KAFKA-8289 indeed?

> ClassCastException using KTable.suppress()
> --
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>     final KTable, GenericRecord> groupTable = 
> groupedStream
>     .aggregate(lastAggregator, lastAggregator, materialized);
>     final KTable, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>     // write the change-log stream to the topic
>     suppressedTable.toStream((k, v) -> k.key())
>     .mapValues(joinValueMapper::apply)
>     .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-07 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835204#comment-16835204
 ] 

Guozhang Wang commented on KAFKA-8317:
--

Hello [~the4thamigo_uk] in the previous operator before suppress you specified 
null as the key / value serde:

{code}
final Materialized> 
materialized =
Materialized.>with(null, null)
.withRetention(props.groupWindowRetention());
{code}

And hence the default serde from the config will be used (although that part 
was not uploaded it seems to be String). I think you should set the key serde 
as windowed serde, sth. like {{TimeWindowedSerde(inner)}}.

> ClassCastException using KTable.suppress()
> --
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>     final KTable, GenericRecord> groupTable = 
> groupedStream
>     .aggregate(lastAggregator, lastAggregator, materialized);
>     final KTable, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>     // write the change-log stream to the topic
>     suppressedTable.toStream((k, v) -> k.key())
>     .mapValues(joinValueMapper::apply)
>     .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8318) Session Window Aggregations generate an extra tombstone

2019-05-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8318:
-
Labels: newbie++  (was: )

> Session Window Aggregations generate an extra tombstone
> ---
>
> Key: KAFKA-8318
> URL: https://issues.apache.org/jira/browse/KAFKA-8318
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie++
>
> See the discussion 
> https://github.com/apache/kafka/pull/6654#discussion_r280231439
> The session merging logic generates a tombstone in addition to an update when 
> the session window already exists. It's not a correctness issue, just a small 
> performance hit, because that tombstone is immediately invalidated by the 
> update.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8311) Better consumer timeout exception handling

2019-05-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8311:
-
Labels: newbie  (was: )

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836770#comment-16836770
 ] 

Guozhang Wang commented on KAFKA-8342:
--

[~pkleindl] thanks for sharing your use case, I think we indeed need to think 
more about what's a pleasant procedure for users to use any admin tools to 
pre-create topics --- better not to manually export the topology by themselves. 
As for schemas, they should be properly registered at runtime by Streams' 
producer clients I think.

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836802#comment-16836802
 ] 

Guozhang Wang commented on KAFKA-8313:
--

Just a few more words on why the threads' being shutdown: during the rebalance 
which is triggered by the deletion of the topic, the leader would realize that 
the target source topic does not exist (any more), and hence it will propagate 
this error code to all the members telling them to shutdown, which is normal.

The issue here though, as you already described, is that when threads are 
shutting down gracefully, the state listener was not being notified because of 
the issue above. I think it should have been fixed by now.

I'm resolving this ticket now but in case you found it was not the case, please 
feel free to re-open.

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Priority: Minor
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836801#comment-16836801
 ] 

Guozhang Wang commented on KAFKA-8313:
--

Hello [~ladoe00] Looking at your description I feel it is likely related to 
this issue https://issues.apache.org/jira/browse/KAFKA-8062 which is resolved 
in 2.2.1 / 2.3.0.

Could you try out those versions by just build from the corresponding branch 
directly and see if it goes away?

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Priority: Minor
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8313.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.3.0

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 2.3.0
>
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837653#comment-16837653
 ] 

Guozhang Wang commented on KAFKA-8335:
--

[~boquan] [~weichu] Thanks for reporting this issue. Jason and I have talked 
about this issue and Jason has proposed a solution which we'd try to get in to 
the upcoming 2.3.0 release.

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator

2019-05-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837729#comment-16837729
 ] 

Guozhang Wang commented on KAFKA-7206:
--

[~sagarrao] I think [~shung] has tried to implemented this idea before but it 
turns out to be quite complex (maybe Yishun can give you some pointers of his 
past contributions). If this is your first work in the codebase that maybe very 
ambitious. Anyways, it is up to you and Yishun.

> Enable batching in FindCoordinator
> --
>
> Key: KAFKA-7206
> URL: https://issues.apache.org/jira/browse/KAFKA-7206
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>  Labels: needs-discussion, needs-kip, newbie++
>
> To quote [~guozhang] :
> "The proposal is that, we extend FindCoordinatorRequest to have multiple 
> consumer ids: today each FindCoordinatorRequest only contains a single 
> consumer id, so in our scenario we need to send N request for N consumer 
> groups still. If we can request for coordinators in a single request, then 
> the workflow could be simplified to:
>  # send a single FindCoordinatorRequest to a broker asking for coordinators 
> of all consumer groups.
>  1.a) note that the response may still succeed in finding some coordinators 
> while error on others, and we need to handle them on that granularity (see 
> below).
>  # and then for the collected coordinator, group them by coordinator id and 
> send one request per coordinator destination.
> Note that this change would require the version to be bumped up, to 
> FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE 
> version should be bumped up in order to include multiple coordinators."
> A KIP is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2019-05-17 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7190.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
> Fix For: 2.3.0
>
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest

2019-05-17 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8389:


 Summary: Duplicated MockProcessorSupplier / MockProcessor in 
TopologyTestDriverTest
 Key: KAFKA-8389
 URL: https://issues.apache.org/jira/browse/KAFKA-8389
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, 
yet we have those in TopologyTestDriverTest as well. We should consider 
removing them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2019-05-18 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6474.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: newbie
> Fix For: 2.3.0
>
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest

2019-05-18 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-8389:


Assignee: Guozhang Wang

> Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
> --
>
> Key: KAFKA-8389
> URL: https://issues.apache.org/jira/browse/KAFKA-8389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, 
> yet we have those in TopologyTestDriverTest as well. We should consider 
> removing them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-20 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844075#comment-16844075
 ] 

Guozhang Wang commented on KAFKA-8367:
--

Could creating the {{new BloomFilter()}} in KAFKA-4850 just be the culprit? 
Depending on which latest trunk that [~pavelsavov] tried out, the fix of that 
may or may not be included.

[~pavelsavov] Could you double check that when building from the latest trunk 
it includes this commit `a1b1e088b98763818e933dce335b580d02916640` that fixed 
the bloomfilter object creation?

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8373) Add group.instance.id field into Sync/Heartbeat/OffsetCommit protocols

2019-05-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8373:
-
Fix Version/s: 2.3.0

> Add group.instance.id field into Sync/Heartbeat/OffsetCommit protocols 
> ---
>
> Key: KAFKA-8373
> URL: https://issues.apache.org/jira/browse/KAFKA-8373
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8371) Remove ReplicaManager dependence from Partition

2019-05-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8371:
-
Labels: tech-debt  (was: )

> Remove ReplicaManager dependence from Partition
> ---
>
> Key: KAFKA-8371
> URL: https://issues.apache.org/jira/browse/KAFKA-8371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: tech-debt
>
> The dependence on ReplicaManager from the Partition logic makes testing this 
> class very cumbersome. Often we are just using ReplicaManager as a way to get 
> access to an additional dependency. We should make the actual dependencies 
> explicit and we should introduce smaller traits which encapsulate the state 
> we actually need.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8396) Clean up Transformer API

2019-05-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8396:
-
Labels: needs-kip user-experience  (was: needs-kip)

> Clean up Transformer API
> 
>
> Key: KAFKA-8396
> URL: https://issues.apache.org/jira/browse/KAFKA-8396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> Currently, KStream operators transformValues and flatTransformValues disable 
> context forwarding, and force operators to just return the new values.
> The reason is that we wanted to prevent the key from changing, since the 
> whole point of a `xValues` transformation is that we _do not_ change the key, 
> and hence don't need to repartition.
> However, the chosen mechanism has some drawbacks: The Transform concept is 
> basically a way to plug in a custom Processor within the Streams DSL, but 
> these restrictions make it more like a MapValues with access to the context. 
> For example, even though you can still schedule punctuations, there's no way 
> to forward values as a result of them. So, as a user, it's hard to build a 
> mental model of how to use a TransformValues (because it's not quite a 
> Transformer and not quite a Mapper).
> Also, logically, a Transformer can call forward as much as it wants, so a 
> Transformer and a FlatTransformer are effectively the same thing. Then, we 
> also have TransformValues and FlatTransformValues that are also two more 
> versions of the same thing, just to implement the key restrictions. 
> Internally, some of these can send downstream by returning OR forwarding, and 
> others can only return. It's a lot for users to keep in mind.
> We can clean up this API significantly by just allowing all transformers to 
> call `forward`. In the `Values` case, we can wrap the ProcessorContext in one 
> that checks the key is `equal` to the one that got passed in (i.e., saves a 
> reference and enforces equality with that reference in any call to 
> `forward`). Then, we can actually deprecate the `*ValueTransformer*` 
> interfaces and remove the restriction about calling forward.
> We can consider a further cleanup (TBD) to deprecate the existing Transformer 
> interface entirely, and replace it with one with a `void` return type. Then, 
> the Transform and FlatTransform cases collapse together, and we just need 
> Transform and TransformValues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8396) Clean up Transformer API

2019-05-20 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844095#comment-16844095
 ] 

Guozhang Wang commented on KAFKA-8396:
--

I like the idea; 

About the further cleanup (TBD), note one difference between return value and 
context.forward is that the former is strong typed, while the latter is not --- 
so technically users can pass anything via that call and it would only cause 
runtime exception.

> Clean up Transformer API
> 
>
> Key: KAFKA-8396
> URL: https://issues.apache.org/jira/browse/KAFKA-8396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> Currently, KStream operators transformValues and flatTransformValues disable 
> context forwarding, and force operators to just return the new values.
> The reason is that we wanted to prevent the key from changing, since the 
> whole point of a `xValues` transformation is that we _do not_ change the key, 
> and hence don't need to repartition.
> However, the chosen mechanism has some drawbacks: The Transform concept is 
> basically a way to plug in a custom Processor within the Streams DSL, but 
> these restrictions make it more like a MapValues with access to the context. 
> For example, even though you can still schedule punctuations, there's no way 
> to forward values as a result of them. So, as a user, it's hard to build a 
> mental model of how to use a TransformValues (because it's not quite a 
> Transformer and not quite a Mapper).
> Also, logically, a Transformer can call forward as much as it wants, so a 
> Transformer and a FlatTransformer are effectively the same thing. Then, we 
> also have TransformValues and FlatTransformValues that are also two more 
> versions of the same thing, just to implement the key restrictions. 
> Internally, some of these can send downstream by returning OR forwarding, and 
> others can only return. It's a lot for users to keep in mind.
> We can clean up this API significantly by just allowing all transformers to 
> call `forward`. In the `Values` case, we can wrap the ProcessorContext in one 
> that checks the key is `equal` to the one that got passed in (i.e., saves a 
> reference and enforces equality with that reference in any call to 
> `forward`). Then, we can actually deprecate the `*ValueTransformer*` 
> interfaces and remove the restriction about calling forward.
> We can consider a further cleanup (TBD) to deprecate the existing Transformer 
> interface entirely, and replace it with one with a `void` return type. Then, 
> the Transform and FlatTransform cases collapse together, and we just need 
> Transform and TransformValues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-05-20 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844103#comment-16844103
 ] 

Guozhang Wang commented on KAFKA-4600:
--

[~braedon] Thanks for your feedbacks. The reason I choose to maintain the 
partition after an unsuccessful revocation is that, in the rebalance protocol, 
the partition would NOT be re-assigned until it is clear that no one currently 
owns it -- i.e. it is re-assignable. In the above case, for example, if the 
`consumer.poll` is called again, then the consumer will send the join-group 
request again claiming that it still owns partition 1, and hence it would not 
be re-assigned elsewhere; if a new generation has already be formed before 
consumer retries `poll`, then its join-group request, or its commit-offset 
request would all be rejected as a fatal error and the consumer has to clear up 
all its owned partitions as "lost" and rejoin as a new member. In either case, 
users do not worry if the have consumed some messages un-safely due to partial 
revocations, since they either still owns it a no one else would gets messages 
from this partition, or they have already lost it and even though they may get 
some messages before trying to heartbeat / commit offset, their committing are 
doomed to fail, similar like the current situation. Does that make sense to you?

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-05-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8377:
-
Labels: newbie++  (was: )

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8364) Avoid decompression of record when validate record at server in the scene of inPlaceAssignment .

2019-05-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8364:
-
Description: 
We do performance testing about Kafka server in specific scenarios .We build a 
kafka cluster with one broker,and create topics with different number of 
partitions.Then we start lots of producer processes to send large amounts of 
messages to one of the topics at one  testing .And  we found that when the 
upper limit of CPU usage has been reached  But  it does not reach the upper 
limit of the bandwidth of the server  network(Network inflow 
rate:600M/s;CPU(%):>97%). 

We analysis the JFIR of Kafka server when doing performance testing .After we 
checked and completed the performance test again, we located the code 
*"*ByteBuffer recordBuffer = 
ByteBuffer.allocate(sizeOfBodyInBytes);(Class:DefaultRecord,Function:readFrom())''which
 consumed CPU resources and caused a lot of GC .So we remove the allocation and 
copying of ByteBuffer at our modified code, the test performance is greatly 
improved(Network inflow rate:1GB/s;CPU(%):<60%) .This issue already have been 
raised and solved at {color:#33}*[KAFKA-8106]*{color}.

*We also analysis the code of validation to record at server. Currently the 
broker will decompress whole record including 'key' and 'value' to validate 
record timestamp, key, offset, uncompressed size bytes, and magic . We remove 
the decompression operation and then do performance testing again . we found 
the CPU's stable usage is below 30% even lower.* *Removing decompression 
operation to record can minimize CPU usage and improve performance greatly.*

Should we think of preventing decompress record when validate record at server 
in the scene of inPlaceAssignment?

*We think we should optimize the process of server-side validation record for 
achieving the purpose of verifying the message without decompressing the 
message.* 
 Maybe we can add some properties ('batch.min.timestamp'(Long) 
,'records.number'(Integer),'all.key.is.null'(boolean)) *in client side to the 
batch level for validation*, *so that we don't need decompress record for 
validate 'offset','timestamp' and key(The value of 'all.key.is.null' will false 
when there is w key is null).*

  was:
We do performance testing about Kafka server in specific scenarios .We build a 
kafka cluster with one broker,and create topics with different number of 
partitions.Then we start lots of producer processes to send large amounts of 
messages to one of the topics at one  testing .And  we found that when the 
upper limit of CPU usage has been reached  But  it does not reach the upper 
limit of the bandwidth of the server  network(Network inflow 
rate:600M/s;CPU(%):>97%). 

We analysis the JFIR of Kafka server when doing performance testing .After we 
checked and completed the performance test again, we located the code 
*"*ByteBuffer recordBuffer = 
ByteBuffer.allocate(sizeOfBodyInBytes);(Class:DefaultRecord,Function:readFrom())''which
 consumed CPU resources and caused a lot of GC .So we remove the allocation and 
copying of ByteBuffer at our modified code, the test performance is greatly 
improved(Network inflow rate:1GB/s;CPU(%):<60%) .This issue already have been 
raised and solved at {color:#33}*[KAFKA-8106]*{color}.

*We also analysis the code of validation to  record at server. Currently the 
broker will decompress whole record including 'key' and 'value' to validate 
record timestamp, key, offset, uncompressed size bytes, and magic . We remove 
the decompression operation and then do performance testing again . we found 
the CPU's stable usage is below 30% even lower.* *Removing decompression 
operation to record can minimize CPU usage and improve performance greatly.*

Should we think of preventing decompress record  when validate record at server 
in the scene of inPlaceAssignment ? 

*We think we should optimize the process of server-side validation record  for 
achieving the purpose of verifying the message without decompressing the 
message.* 
 Maybe we can add some properties ('batch.min.timestamp'(Long) 
,'records.number'(Integer),'all.key.is.null'(boolean)) *in client side to the 
batch level for validation*, *so that we don't need decompress record for 
validate 'offset','timestamp' and key(The value of 'all.key.is.null' will false 
when there is w key is null).*


> Avoid decompression of record when validate record  at server in the scene of 
> inPlaceAssignment .
> -
>
> Key: KAFKA-8364
> URL: https://issues.apache.org/jira/browse/KAFKA-8364
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>
> We do perf

[jira] [Commented] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-22 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846261#comment-16846261
 ] 

Guozhang Wang commented on KAFKA-8410:
--

[~vvcephei] Thanks for creating this ticket. I think it is good to tighten up 
the typing system in DSL.

For PAPI though, remember that we need to allow a processor node to flexibly 
send to multiple children which expect to take different types of input, so we 
need to be careful when trying to enforcing bounding it to specific types.

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest

2019-05-22 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8389.
--
Resolution: Won't Fix

> Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
> --
>
> Key: KAFKA-8389
> URL: https://issues.apache.org/jira/browse/KAFKA-8389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, 
> yet we have those in TopologyTestDriverTest as well. We should consider 
> removing them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign

2019-05-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8420:


 Summary: Graceful handling when consumer switches from subscribe 
to manual assign
 Key: KAFKA-8420
 URL: https://issues.apache.org/jira/browse/KAFKA-8420
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


Today if a consumer switches between subscribe (and hence relies on group 
rebalance to get assignment) and manual assign, it may cause unnecessary 
rebalances. For example:

1. consumer.subscribe();
2. consumer.poll(); // join-group request sent, returns empty because poll 
timeout
3. consumer.unsubscribe();
4. consumer.assign(..);
5. consumer.poll(); // sync-group request received, and the assigned 
partitions does not match the current subscription-state. In this case it will 
tries to re-join which is not necessary.

In the worst case (i.e. leader keep sending incompatible assignment), this 
would case the consumer to fall into endless re-joins.

Although it is not a very common usage scenario, it still worth being better 
handled than the status-quo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8421:


 Summary: Allow consumer.poll() to return data in the middle of 
rebalance
 Key: KAFKA-8421
 URL: https://issues.apache.org/jira/browse/KAFKA-8421
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


With KIP-429 in place, today when a consumer is about to send join-group 
request its owned partitions may not be empty, meaning that some of its fetched 
data can still be returned. Nevertheless, today the logic is strict:

{code}
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
{code}

I.e. if the consumer enters a rebalance it always returns no data. 

As an optimization, we can consider letting consumers to still return messages 
that still belong to its owned partitions even when it is within a rebalance, 
because we know it is safe that no one else would claim those partitions in 
this rebalance yet, and we can still commit offsets if, after this rebalance, 
the partitions need to be revoked then.

One thing we need to take care though is the rebalance timeout, i.e. when 
consumer's processing those records they may not call the next poll() in time 
(think: Kafka Streams num.iterations mechanism), which may leads to consumer 
dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847746#comment-16847746
 ] 

Guozhang Wang commented on KAFKA-8412:
--

[~mjsax] I looked through the code and the original JIRA 
(https://issues.apache.org/jira/browse/KAFKA-7285) again, and I think the above 
case I mentioned still exists, where the root cause is that in `suspend` we 
would close the record collector with EOS turned on while in `close` we may 
want to flush (from state commit), and close the collector again.

What I'm thinking is that, we can refactor the fix in KAFKA-7285, such that in 
suspend, we do the "close-and-then-recreate" completely and only leave the 
`initTxn` call in the `resume` function. In that case whenever we're closing we 
are assured there's still an open producer. With this, we no longer need the 
closed-check in close() function as well.

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlas

  1   2   3   4   5   6   7   8   9   10   >