[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852686#comment-16852686 ] Matthias J. Sax commented on KAFKA-8377: Not sure if I can follow. The decision to materialize or not, is made before we start the application, but while building the `Topology` (please correct my if I am wrong). However, `KTableValueGetterSupplier` is only used at runtime and cannot change the decision we made before any longer? Or do you refer to `KTableImpl#valueGetterSupplier()` – maybe we could do the check there and force materialization of the parent KTable for this case, similar to forcing materialization of source KTables? Btw: while digging into the code, I found that we have classes `KTableMaterializedValueGetterSupplier` and `KTableSourceValueGetterSupplier` that are basically the same – might be worth to remove one as side cleanup with this ticket. > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8446) Kafka Streams restoration crashes with NPE when the record value is null
[ https://issues.apache.org/jira/browse/KAFKA-8446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-8446: --- Description: To add more context here: a NPE will be triggered in *RecordConverters#ByteBuffer.allocate(8 + rawValue.length)* if the raw value with record is null. A scenario you could reproduce is like: # Having a source KTable (timestampedKeyValueStore) reads in tombstone records where the value field is null. Note that you need to turn off stream level cache, otherwise the null value will not be forwarded. # Shutdown the application, and clear local state stores. # Restart the application before changelog topic starts compaction. If we are "lucky", the tombstone record will trigger null pointer exception during restoration when we try to extract raw value length. > Kafka Streams restoration crashes with NPE when the record value is null > > > Key: KAFKA-8446 > URL: https://issues.apache.org/jira/browse/KAFKA-8446 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > > To add more context here: > a NPE will be triggered in *RecordConverters#ByteBuffer.allocate(8 + > rawValue.length)* if the raw value with record is null. A scenario you could > reproduce is like: > # Having a source KTable (timestampedKeyValueStore) reads in tombstone > records where the value field is null. Note that you need to turn off stream > level cache, otherwise the null value will not be forwarded. > # Shutdown the application, and clear local state stores. > # Restart the application before changelog topic starts compaction. If we > are "lucky", the tombstone record will trigger null pointer exception during > restoration when we try to extract raw value length. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852661#comment-16852661 ] ASF GitHub Bot commented on KAFKA-8187: --- hustclf commented on pull request #6849: KAFKA-8187 Bugfix for branch 2.0 URL: https://github.com/apache/kafka/pull/6849 Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks. - Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Lifei Chen >Priority: Major > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the >
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852643#comment-16852643 ] Richard Yu commented on KAFKA-8421: --- Might try to take a hack at this one. It certainly is an interesting issue. :) > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure
[ https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639 ] Boyang Chen edited comment on KAFKA-8438 at 5/31/19 4:20 AM: - Hey Richard, thanks for the update. I'm still not quite clear the purpose of adding this callback at the moment. Quoted from your original statement: > When called, this method could be used to log a consumer failure or should > the user wish it, create a new thread which would then rejoin the consumer > group (which could also include the required {{group.instance.id}} so that a > rebalance wouldn't be re-triggered –- we would need to think about that). I think logging a consumer failure is not that important, because either way user has to be aware of the failure, then this logging needs to talk to external service, generally a path from application -> local metrics agent -> remote time series DB -> pager system. It's not a very much saving of effort with this new callback. Also for rebooting a new consumer, this consumer needs initialization configs and so on. Where are we planning to store these configuration data? If current consumer is already dead, who will be responsible for bringing up an almost-the-same consumer? Could you brainstorm more use cases other than the error logging and rebooting here? was (Author: bchen225242): Hey Richard, thanks for the update. I'm still not quite clear the purpose of adding this callback at the moment. Quoted from your original statement: ```When called, this method could be used to log a consumer failure or should the user wish it, create a new thread which would then rejoin the consumer group (which could also include the required {{group.instance.id}} so that a rebalance wouldn't be re-triggered –- we would need to think about that). ``` I think logging a consumer failure is not that important, because either way user has to be aware of the failure, then this logging needs to talk to external service, generally a path from application -> local metrics agent -> remote time series DB -> pager system. It's not a very much saving of effort with this new callback. Also for rebooting a new consumer, this consumer needs initialization configs and so on. Where are we planning to store these configuration data? If current consumer is already dead, who will be responsible for bringing up an almost-the-same consumer? Could you brainstorm more use cases other than the error logging and rebooting here? > Add API to allow user to define end behavior of consumer failure > > > Key: KAFKA-8438 > URL: https://issues.apache.org/jira/browse/KAFKA-8438 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: needs-dicussion, needs-kip > > Recently, in a concerted effort to make Kafka's rebalances less painful, > various approaches has been used to reduce the number of and impact of > rebalances. Often, the trigger of a rebalance is a failure of some sort or a > thrown exception during processing, in which case, the workload will be > redistributed among surviving threads. Working to reduce rebalances due to > random consumer crashes, a recent change to Kafka internals had been made > (which introduces the concept of static membership) that prevents a rebalance > from occurring within {{session.timeout.ms}} in the hope that the consumer > thread which crashed would recover in that time interval and rejoin the group. > However, in some cases, some consumer threads would permanently go down or > remain dead for long periods of time. In these scenarios, users of Kafka > would possibly not be aware of such a crash until hours later after it > happened which forces Kafka users to manually start a new KafkaConsumer > process a considerable period of time after the failure had occurred. That is > where the addition of a callback such as {{onConsumerFailure}} would help. > There are multiple use cases for this callback (which is defined by the > user). {{onConsumerFailure}} is called when a particular consumer thread goes > under for some specified time interval (i.e. a config called > {{acceptable.consumer.failure.timeout.ms}}). When called, this method could > be used to log a consumer failure or should the user wish it, create a new > thread which would then rejoin the consumer group (which could also include > the required {{group.instance.id}} so that a rebalance wouldn't be > re-triggered –- we would need to think about that). > Should the old thread recover and attempt to rejoin the consumer group (with > the substitute thread being part of the group), the old thread will be denied > access and an exception would be thrown (to
[jira] [Comment Edited] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure
[ https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639 ] Boyang Chen edited comment on KAFKA-8438 at 5/31/19 4:20 AM: - Hey Richard, thanks for the update. I'm still not quite clear the purpose of adding this callback at the moment. Quoted from your original statement: {{When called, this method could be used to log a consumer failure or should the user wish it, create a new thread which would then rejoin the consumer group (which could also include the required }}{{group.instance.id}}{{ so that a rebalance wouldn't be re-triggered –- we would need to think about that). }} I think logging a consumer failure is not that important, because either way user has to be aware of the failure, then this logging needs to talk to external service, generally a path from application -> local metrics agent -> remote time series DB -> pager system. It's not a very much saving of effort with this new callback. Also for rebooting a new consumer, this consumer needs initialization configs and so on. Where are we planning to store these configuration data? If current consumer is already dead, who will be responsible for bringing up an almost-the-same consumer? Could you brainstorm more use cases other than the error logging and rebooting here? was (Author: bchen225242): Hey Richard, thanks for the update. I'm still not quite clear the purpose of adding this callback at the moment. Quoted from your original statement: > When called, this method could be used to log a consumer failure or should > the user wish it, create a new thread which would then rejoin the consumer > group (which could also include the required {{group.instance.id}} so that a > rebalance wouldn't be re-triggered –- we would need to think about that). I think logging a consumer failure is not that important, because either way user has to be aware of the failure, then this logging needs to talk to external service, generally a path from application -> local metrics agent -> remote time series DB -> pager system. It's not a very much saving of effort with this new callback. Also for rebooting a new consumer, this consumer needs initialization configs and so on. Where are we planning to store these configuration data? If current consumer is already dead, who will be responsible for bringing up an almost-the-same consumer? Could you brainstorm more use cases other than the error logging and rebooting here? > Add API to allow user to define end behavior of consumer failure > > > Key: KAFKA-8438 > URL: https://issues.apache.org/jira/browse/KAFKA-8438 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: needs-dicussion, needs-kip > > Recently, in a concerted effort to make Kafka's rebalances less painful, > various approaches has been used to reduce the number of and impact of > rebalances. Often, the trigger of a rebalance is a failure of some sort or a > thrown exception during processing, in which case, the workload will be > redistributed among surviving threads. Working to reduce rebalances due to > random consumer crashes, a recent change to Kafka internals had been made > (which introduces the concept of static membership) that prevents a rebalance > from occurring within {{session.timeout.ms}} in the hope that the consumer > thread which crashed would recover in that time interval and rejoin the group. > However, in some cases, some consumer threads would permanently go down or > remain dead for long periods of time. In these scenarios, users of Kafka > would possibly not be aware of such a crash until hours later after it > happened which forces Kafka users to manually start a new KafkaConsumer > process a considerable period of time after the failure had occurred. That is > where the addition of a callback such as {{onConsumerFailure}} would help. > There are multiple use cases for this callback (which is defined by the > user). {{onConsumerFailure}} is called when a particular consumer thread goes > under for some specified time interval (i.e. a config called > {{acceptable.consumer.failure.timeout.ms}}). When called, this method could > be used to log a consumer failure or should the user wish it, create a new > thread which would then rejoin the consumer group (which could also include > the required {{group.instance.id}} so that a rebalance wouldn't be > re-triggered –- we would need to think about that). > Should the old thread recover and attempt to rejoin the consumer group (with > the substitute thread being part of the group), the old thread will be denied > access and an exception would be thrown
[jira] [Commented] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure
[ https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639 ] Boyang Chen commented on KAFKA-8438: Hey Richard, thanks for the update. I'm still not quite clear the purpose of adding this callback at the moment. Quoted from your original statement: ```When called, this method could be used to log a consumer failure or should the user wish it, create a new thread which would then rejoin the consumer group (which could also include the required {{group.instance.id}} so that a rebalance wouldn't be re-triggered –- we would need to think about that). ``` I think logging a consumer failure is not that important, because either way user has to be aware of the failure, then this logging needs to talk to external service, generally a path from application -> local metrics agent -> remote time series DB -> pager system. It's not a very much saving of effort with this new callback. Also for rebooting a new consumer, this consumer needs initialization configs and so on. Where are we planning to store these configuration data? If current consumer is already dead, who will be responsible for bringing up an almost-the-same consumer? Could you brainstorm more use cases other than the error logging and rebooting here? > Add API to allow user to define end behavior of consumer failure > > > Key: KAFKA-8438 > URL: https://issues.apache.org/jira/browse/KAFKA-8438 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: needs-dicussion, needs-kip > > Recently, in a concerted effort to make Kafka's rebalances less painful, > various approaches has been used to reduce the number of and impact of > rebalances. Often, the trigger of a rebalance is a failure of some sort or a > thrown exception during processing, in which case, the workload will be > redistributed among surviving threads. Working to reduce rebalances due to > random consumer crashes, a recent change to Kafka internals had been made > (which introduces the concept of static membership) that prevents a rebalance > from occurring within {{session.timeout.ms}} in the hope that the consumer > thread which crashed would recover in that time interval and rejoin the group. > However, in some cases, some consumer threads would permanently go down or > remain dead for long periods of time. In these scenarios, users of Kafka > would possibly not be aware of such a crash until hours later after it > happened which forces Kafka users to manually start a new KafkaConsumer > process a considerable period of time after the failure had occurred. That is > where the addition of a callback such as {{onConsumerFailure}} would help. > There are multiple use cases for this callback (which is defined by the > user). {{onConsumerFailure}} is called when a particular consumer thread goes > under for some specified time interval (i.e. a config called > {{acceptable.consumer.failure.timeout.ms}}). When called, this method could > be used to log a consumer failure or should the user wish it, create a new > thread which would then rejoin the consumer group (which could also include > the required {{group.instance.id}} so that a rebalance wouldn't be > re-triggered –- we would need to think about that). > Should the old thread recover and attempt to rejoin the consumer group (with > the substitute thread being part of the group), the old thread will be denied > access and an exception would be thrown (to indicate that another process has > already taken its place). > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8451) transaction support multiple producer instance
[ https://issues.apache.org/jira/browse/KAFKA-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wenxuanguan updated KAFKA-8451: --- Description: It's common in distributed system that multiple producer instances send message in one transaction concurrently, and the transaction is committed when all the producer send message successfully. Otherwise, if one producer failed, the transaction is aborted and no message will be consumed. However, when multiple producer share the same txn id, throw the following exception: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606) at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. was: It's common in distributed system that multiple producer instances send message in one transaction concurrently, and if one producer failed, the other producer abort transaction. However, when multiple producer share the same txn id, throw the following exception: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606) at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. > transaction support multiple producer instance > -- > > Key: KAFKA-8451 > URL: https://issues.apache.org/jira/browse/KAFKA-8451 > Project: Kafka > Issue Type: Bug >Reporter: wenxuanguan >Priority: Major > > It's common in distributed system that multiple producer instances send > message in one transaction concurrently, and the transaction is committed > when all the producer send message successfully. Otherwise, if one producer > failed, the transaction is aborted and no message will be consumed. > However, when multiple producer share the same txn id, throw the following > exception: > org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error state > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) > at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606) > at > com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68) > Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer > attempted an operation with an old epoch. Either there is a newer producer > with the same transactionalId, or the producer's transaction has been expired > by the broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value
[ https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852540#comment-16852540 ] ASF GitHub Bot commented on KAFKA-8452: --- vvcephei commented on pull request #6848: KAFKA-8452: Compressed BufferValue URL: https://github.com/apache/kafka/pull/6848 De-duplicate the common case in which the prior value is the same as the old value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Possible Suppress buffer optimization: de-duplicate prior value > --- > > Key: KAFKA-8452 > URL: https://issues.apache.org/jira/browse/KAFKA-8452 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > As of KAFKA-8199, the suppression buffers have to track the "prior value" in > addition to the "old" and "new" values for each record, to support > transparent downstream views. > In many cases, the prior value is actually the same as the old value, and we > could avoid storing it separately. The challenge is that the old and new > values are already serialized into a common array (as a Change via the > FullChangeSerde), so the "prior" value would actually be a slice on the > underlying array. But, of course, Java does not have array slices. > To get around this, we either need to switch to ByteBuffers (which support > slices) or break apart the serialized Change into just serialized old and new > values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value
John Roesler created KAFKA-8452: --- Summary: Possible Suppress buffer optimization: de-duplicate prior value Key: KAFKA-8452 URL: https://issues.apache.org/jira/browse/KAFKA-8452 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler As of KAFKA-8199, the suppression buffers have to track the "prior value" in addition to the "old" and "new" values for each record, to support transparent downstream views. In many cases, the prior value is actually the same as the old value, and we could avoid storing it separately. The challenge is that the old and new values are already serialized into a common array (as a Change via the FullChangeSerde), so the "prior" value would actually be a slice on the underlying array. But, of course, Java does not have array slices. To get around this, we either need to switch to ByteBuffers (which support slices) or break apart the serialized Change into just serialized old and new values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8451) transaction support multiple producer instance
wenxuanguan created KAFKA-8451: -- Summary: transaction support multiple producer instance Key: KAFKA-8451 URL: https://issues.apache.org/jira/browse/KAFKA-8451 Project: Kafka Issue Type: Bug Reporter: wenxuanguan It's common in distributed system that multiple producer instances send message in one transaction concurrently, and if one producer failed, the other producer abort transaction. However, when multiple producer share the same txn id, throw the following exception: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606) at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852478#comment-16852478 ] Guozhang Wang commented on KAFKA-8377: -- How about this: Update `valueGetterSupplier` to special handle `KTableTransformValues` such that 1. If transformValues(..) takes non-empty `stateStoreNames` (we need to book-keep this flag somewhere), then always materialize the result. 2. Otherwise, still delegate to the parent getters. > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
[ https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852473#comment-16852473 ] ASF GitHub Bot commented on KAFKA-8448: --- jolshan commented on pull request #6847: KAFKA-8448: Too many kafka.log.Log instances (Memory Leak) URL: https://github.com/apache/kafka/pull/6847 Added a method to delete the PeriodicProducerExpirationCheck task that kept adding to the log. Includes a test to show that the task exists and then is removed when the log is closed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many kafka.log.Log instances (Memory Leak) > -- > > Key: KAFKA-8448 > URL: https://issues.apache.org/jira/browse/KAFKA-8448 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 > Environment: Red Hat 4.4.7-16, java version "1.8.0_152", > kafka_2.12-2.2.0 >Reporter: Juan Olivares >Assignee: Justine Olshan >Priority: Major > > We have a custom Kafka health check which creates a topic, add some ACLs > (read/write topic and group), produce & consume a single message and then > quickly remove it and all the related ACLs created. We close the consumer > involved, but no the producer. > We have observed that # of instances of {{kafka.log.Log}} keep growing, while > there's no evidence of topics being leaked, neither running > {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , > nor looking at the disk directory where topics are stored. > After looking at the heapdump we've observed the following > - None of the {{kafka.log.Log}} references ({{currentLogs}}, > {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is > holding the big amount of {{kafka.log.Log}} instances. > - The only reference preventing {{kafka.log.Log}} to be Garbage collected > seems to be > {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which > contains schedule tasks created with the name > {{PeriodicProducerExpirationCheck}}. > I can see in the code that for every {{kafka.log.Log}} a task with this name > is scheduled. > {code:java} > scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { > lock synchronized { > producerStateManager.removeExpiredProducers(time.milliseconds) > } > }, period = producerIdExpirationCheckIntervalMs, delay = > producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) > {code} > However it seems those tasks are never unscheduled/cancelled -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8446) Kafka Streams restoration crashes with NPE when the record value is null
[ https://issues.apache.org/jira/browse/KAFKA-8446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8446: --- Summary: Kafka Streams restoration crashes with NPE when the record value is null (was: KStream restoration will crash with NPE when the record value is null) > Kafka Streams restoration crashes with NPE when the record value is null > > > Key: KAFKA-8446 > URL: https://issues.apache.org/jira/browse/KAFKA-8446 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852453#comment-16852453 ] Matthias J. Sax commented on KAFKA-8377: If we decide to materialize the upstream store for this case (and I cannot think of any other fix atm \cc [~guozhang], [~vvcephei]) it should not be too hard. We need to detect the case on which we need to materialize, and force a materialization. From my current understanding, the fix would go into the translation/optimization layer that translates the logical topology in the physical Topology (\cc [~bbejeck] to confirm if this makes sense). > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852428#comment-16852428 ] Richard Yu commented on KAFKA-3539: --- [~radai] I don't know if this problem is resolved or not. Issue 6705 was closed because it was thought any changes made would be too complex to fix this behavioral issue. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851300#comment-16851300 ] Richard Yu edited comment on KAFKA-8377 at 5/30/19 10:45 PM: - cc [~mjsax] How hard would it be to fix this bug? was (Author: yohan123): cc [~mjsax] How hard would this bug be when tackled? > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8199) ClassCastException when trying to groupBy after suppress
[ https://issues.apache.org/jira/browse/KAFKA-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852422#comment-16852422 ] ASF GitHub Bot commented on KAFKA-8199: --- guozhangwang commented on pull request #6781: KAFKA-8199: Implement ValueGetter for Suppress URL: https://github.com/apache/kafka/pull/6781 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ClassCastException when trying to groupBy after suppress > > > Key: KAFKA-8199 > URL: https://issues.apache.org/jira/browse/KAFKA-8199 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Bill Bejeck >Assignee: Jose Lopez >Priority: Major > Fix For: 2.3.0 > > > A topology with a groupBy after a suppress operation results in a > ClassCastException > The following sample topology > {noformat} > Properties properties = new Properties(); > properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost"); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream("topic") > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() > .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), > BufferConfig.unbounded())) > .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); > builder.build(properties); > {noformat} > results in this exception: > {noformat} > java.lang.ClassCastException: > org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 > cannot be cast to > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8450) Augment processed in MockProcessor as KeyValueAndTimestamp
Guozhang Wang created KAFKA-8450: Summary: Augment processed in MockProcessor as KeyValueAndTimestamp Key: KAFKA-8450 URL: https://issues.apache.org/jira/browse/KAFKA-8450 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Guozhang Wang Today the book-keeping list of `processed` records in MockProcessor is in the form of String, in which we just call the key / value type's toString function in order to book-keep, this loses the type information as well as not having timestamp associated with it. It's better to translate its type to `KeyValueAndTimestamp` and refactor impacted unit tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
[ https://issues.apache.org/jira/browse/KAFKA-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852397#comment-16852397 ] ASF GitHub Bot commented on KAFKA-8389: --- guozhangwang commented on pull request #6761: KAFKA-8389: Remove redundant bookkeeping from MockProcessor URL: https://github.com/apache/kafka/pull/6761 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest > -- > > Key: KAFKA-8389 > URL: https://issues.apache.org/jira/browse/KAFKA-8389 > Project: Kafka > Issue Type: Improvement >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, > yet we have those in TopologyTestDriverTest as well. We should consider > removing them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852379#comment-16852379 ] ASF GitHub Bot commented on KAFKA-5998: --- bbejeck commented on pull request #6846: KAFKA-5998: Create state dir and checkpoint file if it doesn't exist when checkpointing URL: https://github.com/apache/kafka/pull/6846 There can exist a condition where the state directory for a task gets deleted by the cleanup thread, but the directory is still needed for the given task for checkpointing. This PR will catch the `FileNotFoundException` and create the required task directory so the checkpoint file can be created. Tests have been updated to validate the task directory and checkpoint file are created when a `FileNotFoundException` is thrown. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: Bill Bejeck >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at
[jira] [Resolved] (KAFKA-8441) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
[ https://issues.apache.org/jira/browse/KAFKA-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-8441. -- Resolution: Invalid > Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated > > > Key: KAFKA-8441 > URL: https://issues.apache.org/jira/browse/KAFKA-8441 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.3.0 >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. Topics not > deleted after 3 milli seconds. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:265) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAndRecreateTopics(EmbeddedKafkaCluster.java:288) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.setUp(RegexSourceIntegrationTest.java:118) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at
[jira] [Resolved] (KAFKA-8440) Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows
[ https://issues.apache.org/jira/browse/KAFKA-8440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-8440. -- Resolution: Invalid > Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows > --- > > Key: KAFKA-8440 > URL: https://issues.apache.org/jira/browse/KAFKA-8440 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.3.0 >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {noformat} > java.lang.AssertionError: > Expected: > but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldReduceSessionWindows(KStreamAggregationIntegrationTest.java:663) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at >
[jira] [Commented] (KAFKA-8155) Update Streams system tests for 2.2.0 and 2.1.1 releases
[ https://issues.apache.org/jira/browse/KAFKA-8155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852283#comment-16852283 ] ASF GitHub Bot commented on KAFKA-8155: --- mjsax commented on pull request #6596: KAFKA-8155: Add 2.1.1 release to system tests URL: https://github.com/apache/kafka/pull/6596 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update Streams system tests for 2.2.0 and 2.1.1 releases > > > Key: KAFKA-8155 > URL: https://issues.apache.org/jira/browse/KAFKA-8155 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: John Roesler >Assignee: Matthias J. Sax >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852279#comment-16852279 ] Andrew Olson commented on KAFKA-8421: - Agree this would be a potentially quite beneficial enhancement. > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-5998: -- Assignee: Bill Bejeck > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: Bill Bejeck >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) >
[jira] [Resolved] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8418. -- Resolution: Fixed Reviewer: Randall Hauch Merged and backported all the way to `1.0` branch. > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > I am getting an error while running Kafka tests: > {code} > Traceback (most recent call last): File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run data = self.run_test() File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 189, in run_test return self.test_context.function(self.test) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 89, in test_rest_api assert set([connector_plugin['class'] for > connector_plugin in self.cc.list_connector_plugins()]) == > \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 218, in list_connector_plugins return self._rest('/connector-plugins/', > node=node) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, > resp.url) ConnectRestError > {code} > From the logs, I see two messages: > {code} > [2019-05-23 16:09:59,373] INFO REST server listening at > http://172.31.39.205:8083/, advertising URL http://worker1:8083/ > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > and {code} > [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started > and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > it takes 1365 ms to actually load REST resources, but the test is waiting on > a port to be listening. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-8418: - Fix Version/s: 2.2.1 2.1.2 2.0.2 1.1.2 1.0.3 > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > I am getting an error while running Kafka tests: > {code} > Traceback (most recent call last): File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run data = self.run_test() File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 189, in run_test return self.test_context.function(self.test) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 89, in test_rest_api assert set([connector_plugin['class'] for > connector_plugin in self.cc.list_connector_plugins()]) == > \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 218, in list_connector_plugins return self._rest('/connector-plugins/', > node=node) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, > resp.url) ConnectRestError > {code} > From the logs, I see two messages: > {code} > [2019-05-23 16:09:59,373] INFO REST server listening at > http://172.31.39.205:8083/, advertising URL http://worker1:8083/ > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > and {code} > [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started > and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > it takes 1365 ms to actually load REST resources, but the test is waiting on > a port to be listening. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered
[ https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852247#comment-16852247 ] ASF GitHub Bot commented on KAFKA-8418: --- rhauch commented on pull request #6840: KAFKA-8418: Wait until REST resources are loaded when starting a Connect Worker. URL: https://github.com/apache/kafka/pull/6840 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect System tests are not waiting for REST resources to be registered > > > Key: KAFKA-8418 > URL: https://issues.apache.org/jira/browse/KAFKA-8418 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > Fix For: 2.3.0 > > > I am getting an error while running Kafka tests: > {code} > Traceback (most recent call last): File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run data = self.run_test() File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", > line 189, in run_test return self.test_context.function(self.test) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 89, in test_rest_api assert set([connector_plugin['class'] for > connector_plugin in self.cc.list_connector_plugins()]) == > \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 218, in list_connector_plugins return self._rest('/connector-plugins/', > node=node) File > "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py", > line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, > resp.url) ConnectRestError > {code} > From the logs, I see two messages: > {code} > [2019-05-23 16:09:59,373] INFO REST server listening at > http://172.31.39.205:8083/, advertising URL http://worker1:8083/ > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > and {code} > [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started > and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {code} > it takes 1365 ms to actually load REST resources, but the test is waiting on > a port to be listening. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8293) Messages undelivered when small quotas applied
[ https://issues.apache.org/jira/browse/KAFKA-8293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852160#comment-16852160 ] Rens Groothuijsen commented on KAFKA-8293: -- I haven't been able to replicate this issue with a local single-node configuration. Are you sending these messages over a network, or is it entirely local? > Messages undelivered when small quotas applied > --- > > Key: KAFKA-8293 > URL: https://issues.apache.org/jira/browse/KAFKA-8293 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: Kirill Kulikov >Priority: Major > Labels: quotas > > I observe a strange Kafka behavior when using small quotas. > For ex. I set quotas for the consumer like > > {code:java} > kafka-configs --zookeeper zookeeper:2181 --entity-type users --entity-name > kafka --alter --add-config 'producer_byte_rate=2048000, > consumer_byte_rate=256'{code} > > If I send a small batch of messages as > > {code:java} > kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties > --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 > --num-records 10 --record-size 20 --throughput 1000 --print-metrics --topic > test > {code} > they go through without problems. > But when the batch is bigger > > {code:java} > kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties > --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 > --num-records 100 --record-size 20 --throughput 1000 --print-metrics --topic > test > {code} > ... I do not get any messages on the consumer side *at all*. > On the other hand, if `kafka-producer-perf-test` throughput is limited like > > {code:java} > kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties > --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 > --num-records 1000 --record-size 10 --throughput 10 --print-metrics --topic > test > {code} > I can see only the first 20-30 messages in `kafka-console-consumer`. But then > it gets stuck (throttled perhaps) and other queued messages never come > through. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852127#comment-16852127 ] Guozhang Wang commented on KAFKA-8367: -- [~pavelsavov] Could you share the code of transformer / processor that access the `(stores: [performance_windowed_store])` as well? > Non-heap memory leak in Kafka Streams > - > > Key: KAFKA-8367 > URL: https://issues.apache.org/jira/browse/KAFKA-8367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Pavel Savov >Priority: Major > Attachments: memory-prod.png, memory-test.png > > > We have been observing a non-heap memory leak after upgrading to Kafka > Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the > leak only happens when we enable stateful stream operations (utilizing > stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 > and ported the fix scheduled for release in 2.2.1 to our fork. It did not > stop the leak, however. > We are having this memory leak in our production environment where the > consumer group is auto-scaled in and out in response to changes in traffic > volume, and in our test environment where we have two consumers, no > autoscaling and relatively constant traffic. > Below is some information I'm hoping will be of help: > * RocksDB Config: > Block cache size: 4 MiB > Write buffer size: 2 MiB > Block size: 16 KiB > Cache index and filter blocks: true > Manifest preallocation size: 64 KiB > Max write buffer number: 3 > Max open files: 6144 > > * Memory usage in production > The attached graph (memory-prod.png) shows memory consumption for each > instance as a separate line. The horizontal red line at 6 GiB is the memory > limit. > As illustrated on the attached graph from production, memory consumption in > running instances goes up around autoscaling events (scaling the consumer > group either in or out) and associated rebalancing. It stabilizes until the > next autoscaling event but it never goes back down. > An example of scaling out can be seen from around 21:00 hrs where three new > instances are started in response to a traffic spike. > Just after midnight traffic drops and some instances are shut down. Memory > consumption in the remaining running instances goes up. > Memory consumption climbs again from around 6:00AM due to increased traffic > and new instances are being started until around 10:30AM. Memory consumption > never drops until the cluster is restarted around 12:30. > > * Memory usage in test > As illustrated by the attached graph (memory-test.png) we have a fixed number > of two instances in our test environment and no autoscaling. Memory > consumption rises linearly until it reaches the limit (around 2:00 AM on > 5/13) and Mesos restarts the offending instances, or we restart the cluster > manually. > > * No heap leaks observed > * Window retention: 2 or 11 minutes (depending on operation type) > * Issue not present in Kafka Streams 2.0.1 > * No memory leak for stateless stream operations (when no RocksDB stores are > used) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-8187. Resolution: Fixed merged to trunk and cherry-picked to 2.3, 2.2 and 2.1 > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Lifei Chen >Priority: Major > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the previous > reassignment. > If this host is not assigned 1_0, then the state directory will get cleaned > up by the stateDirCleaner thread 10 to 20 minutes later and the record loss > issue will be hidden. > [0] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869 > [1] >
[jira] [Updated] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8187: --- Fix Version/s: 2.2.1 2.1.2 2.3.0 > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Lifei Chen >Priority: Major > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the previous > reassignment. > If this host is not assigned 1_0, then the state directory will get cleaned > up by the stateDirCleaner thread 10 to 20 minutes later and the record loss > issue will be hidden. > [0] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869 > [1] >
[jira] [Commented] (KAFKA-6455) Improve timestamp propagation at DSL level
[ https://issues.apache.org/jira/browse/KAFKA-6455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852060#comment-16852060 ] ASF GitHub Bot commented on KAFKA-6455: --- mjsax commented on pull request #6751: KAFKA-6455: Update integration tests to verify result timestamps URL: https://github.com/apache/kafka/pull/6751 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve timestamp propagation at DSL level > -- > > Key: KAFKA-6455 > URL: https://issues.apache.org/jira/browse/KAFKA-6455 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > Fix For: 2.3.0 > > > At DSL level, we inherit the timestamp propagation "contract" from the > Processor API. This contract in not optimal at DSL level, and we should > define a DSL level contract that matches the semantics of the corresponding > DSL operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
[ https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852041#comment-16852041 ] Ismael Juma commented on KAFKA-8448: Nice find, thanks for the report. > Too many kafka.log.Log instances (Memory Leak) > -- > > Key: KAFKA-8448 > URL: https://issues.apache.org/jira/browse/KAFKA-8448 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 > Environment: Red Hat 4.4.7-16, java version "1.8.0_152", > kafka_2.12-2.2.0 >Reporter: Juan Olivares >Assignee: Justine Olshan >Priority: Major > > We have a custom Kafka health check which creates a topic, add some ACLs > (read/write topic and group), produce & consume a single message and then > quickly remove it and all the related ACLs created. We close the consumer > involved, but no the producer. > We have observed that # of instances of {{kafka.log.Log}} keep growing, while > there's no evidence of topics being leaked, neither running > {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , > nor looking at the disk directory where topics are stored. > After looking at the heapdump we've observed the following > - None of the {{kafka.log.Log}} references ({{currentLogs}}, > {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is > holding the big amount of {{kafka.log.Log}} instances. > - The only reference preventing {{kafka.log.Log}} to be Garbage collected > seems to be > {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which > contains schedule tasks created with the name > {{PeriodicProducerExpirationCheck}}. > I can see in the code that for every {{kafka.log.Log}} a task with this name > is scheduled. > {code:java} > scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { > lock synchronized { > producerStateManager.removeExpiredProducers(time.milliseconds) > } > }, period = producerIdExpirationCheckIntervalMs, delay = > producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) > {code} > However it seems those tasks are never unscheduled/cancelled -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
[ https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-8448: - Assignee: Justine Olshan > Too many kafka.log.Log instances (Memory Leak) > -- > > Key: KAFKA-8448 > URL: https://issues.apache.org/jira/browse/KAFKA-8448 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 > Environment: Red Hat 4.4.7-16, java version "1.8.0_152", > kafka_2.12-2.2.0 >Reporter: Juan Olivares >Assignee: Justine Olshan >Priority: Major > > We have a custom Kafka health check which creates a topic, add some ACLs > (read/write topic and group), produce & consume a single message and then > quickly remove it and all the related ACLs created. We close the consumer > involved, but no the producer. > We have observed that # of instances of {{kafka.log.Log}} keep growing, while > there's no evidence of topics being leaked, neither running > {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , > nor looking at the disk directory where topics are stored. > After looking at the heapdump we've observed the following > - None of the {{kafka.log.Log}} references ({{currentLogs}}, > {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is > holding the big amount of {{kafka.log.Log}} instances. > - The only reference preventing {{kafka.log.Log}} to be Garbage collected > seems to be > {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which > contains schedule tasks created with the name > {{PeriodicProducerExpirationCheck}}. > I can see in the code that for every {{kafka.log.Log}} a task with this name > is scheduled. > {code:java} > scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { > lock synchronized { > producerStateManager.removeExpiredProducers(time.milliseconds) > } > }, period = producerIdExpirationCheckIntervalMs, delay = > producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) > {code} > However it seems those tasks are never unscheduled/cancelled -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8429) Consumer should handle offset change while OffsetForLeaderEpoch is inflight
[ https://issues.apache.org/jira/browse/KAFKA-8429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8429. Resolution: Fixed Fix Version/s: (was: 2.3.0) > Consumer should handle offset change while OffsetForLeaderEpoch is inflight > --- > > Key: KAFKA-8429 > URL: https://issues.apache.org/jira/browse/KAFKA-8429 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > It is possible for the offset of a partition to be changed while we are in > the middle of validation. If the OffsetForLeaderEpoch request is in-flight > and the offset changes, we need to redo the validation after it returns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8429) Consumer should handle offset change while OffsetForLeaderEpoch is inflight
[ https://issues.apache.org/jira/browse/KAFKA-8429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852015#comment-16852015 ] ASF GitHub Bot commented on KAFKA-8429: --- hachikuji commented on pull request #6811: KAFKA-8429; Handle offset change when OffsetForLeaderEpoch inflight URL: https://github.com/apache/kafka/pull/6811 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consumer should handle offset change while OffsetForLeaderEpoch is inflight > --- > > Key: KAFKA-8429 > URL: https://issues.apache.org/jira/browse/KAFKA-8429 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.3.0 > > > It is possible for the offset of a partition to be changed while we are in > the middle of validation. If the OffsetForLeaderEpoch request is in-flight > and the offset changes, we need to redo the validation after it returns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851996#comment-16851996 ] ASF GitHub Bot commented on KAFKA-8187: --- bbejeck commented on pull request #6818: KAFKA-8187: Add wait time for other thread in the same jvm to free the locks URL: https://github.com/apache/kafka/pull/6818 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Lifei Chen >Priority: Major > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the
[jira] [Created] (KAFKA-8449) Restart task on reconfiguration under incremental cooperative rebalancing
Konstantine Karantasis created KAFKA-8449: - Summary: Restart task on reconfiguration under incremental cooperative rebalancing Key: KAFKA-8449 URL: https://issues.apache.org/jira/browse/KAFKA-8449 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Konstantine Karantasis Assignee: Konstantine Karantasis Fix For: 2.3.0 Tasks that are already running and are not redistributed are not currently restarted under incremental cooperative rebalancing when their configuration changes. With eager rebalancing the restart was triggered and therefore implied by rebalancing itself. But now existing tasks will not read the new configuration unless restarted via the REST api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
[ https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Olivares updated KAFKA-8448: - Description: We have a custom Kafka health check which creates a topic, add some ACLs (read/write topic and group), produce & consume a single message and then quickly remove it and all the related ACLs created. We close the consumer involved, but no the producer. We have observed that # of instances of {{kafka.log.Log}} keep growing, while there's no evidence of topics being leaked, neither running {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor looking at the disk directory where topics are stored. After looking at the heapdump we've observed the following - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big amount of {{kafka.log.Log}} instances. - The only reference preventing {{kafka.log.Log}} to be Garbage collected seems to be {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which contains schedule tasks created with the name {{PeriodicProducerExpirationCheck}}. I can see in the code that for every {{kafka.log.Log}} a task with this name is scheduled. {code:java} scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) {code} However it seems those tasks are never unscheduled/cancelled was: We have a custom Kafka health check which creates a topic, add some ACLs (read/write topic and group), produce & consume a single message and then quickly remove it and all the related ACLs created. We have observed that # of instances of {{kafka.log.Log}} keep growing, while there's no evidence of topics being leaked, neither running {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor looking at the disk directory where topics are stored. After looking at the heapdump we've observed the following - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big amount of {{kafka.log.Log}} instances. - The only reference preventing {{kafka.log.Log}} to be Garbage collected seems to be {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which contains schedule tasks created with the name {{PeriodicProducerExpirationCheck}}. I can see in the code that for every {{kafka.log.Log}} a task with this name is scheduled. {code:java} scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) {code} However it seems those tasks are never unscheduled/cancelled > Too many kafka.log.Log instances (Memory Leak) > -- > > Key: KAFKA-8448 > URL: https://issues.apache.org/jira/browse/KAFKA-8448 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 > Environment: Red Hat 4.4.7-16, java version "1.8.0_152", > kafka_2.12-2.2.0 >Reporter: Juan Olivares >Priority: Major > > We have a custom Kafka health check which creates a topic, add some ACLs > (read/write topic and group), produce & consume a single message and then > quickly remove it and all the related ACLs created. We close the consumer > involved, but no the producer. > We have observed that # of instances of {{kafka.log.Log}} keep growing, while > there's no evidence of topics being leaked, neither running > {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , > nor looking at the disk directory where topics are stored. > After looking at the heapdump we've observed the following > - None of the {{kafka.log.Log}} references ({{currentLogs}}, > {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is > holding the big amount of {{kafka.log.Log}} instances. > - The only reference preventing {{kafka.log.Log}} to be Garbage collected > seems to be > {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which > contains schedule tasks created with the name > {{PeriodicProducerExpirationCheck}}. > I can see in the code that for every {{kafka.log.Log}} a task with this name > is scheduled. > {code:java} > scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { > lock synchronized { >
[jira] [Updated] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
[ https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Olivares updated KAFKA-8448: - Description: We have a custom Kafka health check which creates a topic, add some ACLs (read/write topic and group), produce & consume a single message and then quickly remove it and all the related ACLs created. We have observed that # of instances of {{kafka.log.Log}} keep growing, while there's no evidence of topics being leaked, neither running {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor looking at the disk directory where topics are stored. After looking at the heapdump we've observed the following - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big amount of {{kafka.log.Log}} instances. - The only reference preventing {{kafka.log.Log}} to be Garbage collected seems to be {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which contains schedule tasks created with the name {{PeriodicProducerExpirationCheck}}. I can see in the code that for every {{kafka.log.Log}} a task with this name is scheduled. {code:java} scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) {code} However it seems those tasks are never unscheduled/cancelled was: We have a custom Kafka health check which creates a topic add some ACLs (read/write topic and group), produce & consume a single message and then quickly remove it and all the related ACLs created. We have observed that # of instances of {{kafka.log.Log}} keep growing, while there's no evidence of topics being leaked, neither running {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor looking at the disk directory where topics are stored. After looking at the heapdump we've observed the following - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big amount of {{kafka.log.Log}} instances. - The only reference preventing {{kafka.log.Log}} to be Garbage collected seems to be {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which contains schedule tasks created with the name {{PeriodicProducerExpirationCheck}}. I can see in the code that for every {{kafka.log.Log}} a task with this name is scheduled. {code:java} scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) {code} However it seems those tasks are never unscheduled/cancelled > Too many kafka.log.Log instances (Memory Leak) > -- > > Key: KAFKA-8448 > URL: https://issues.apache.org/jira/browse/KAFKA-8448 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 > Environment: Red Hat 4.4.7-16, java version "1.8.0_152", > kafka_2.12-2.2.0 >Reporter: Juan Olivares >Priority: Major > > We have a custom Kafka health check which creates a topic, add some ACLs > (read/write topic and group), produce & consume a single message and then > quickly remove it and all the related ACLs created. > We have observed that # of instances of {{kafka.log.Log}} keep growing, while > there's no evidence of topics being leaked, neither running > {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , > nor looking at the disk directory where topics are stored. > After looking at the heapdump we've observed the following > - None of the {{kafka.log.Log}} references ({{currentLogs}}, > {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is > holding the big amount of {{kafka.log.Log}} instances. > - The only reference preventing {{kafka.log.Log}} to be Garbage collected > seems to be > {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which > contains schedule tasks created with the name > {{PeriodicProducerExpirationCheck}}. > I can see in the code that for every {{kafka.log.Log}} a task with this name > is scheduled. > {code:java} > scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { > lock synchronized { > producerStateManager.removeExpiredProducers(time.milliseconds) > } > }, period = producerIdExpirationCheckIntervalMs, delay = > producerIdExpirationCheckIntervalMs,
[jira] [Created] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)
Juan Olivares created KAFKA-8448: Summary: Too many kafka.log.Log instances (Memory Leak) Key: KAFKA-8448 URL: https://issues.apache.org/jira/browse/KAFKA-8448 Project: Kafka Issue Type: Bug Affects Versions: 2.2.0 Environment: Red Hat 4.4.7-16, java version "1.8.0_152", kafka_2.12-2.2.0 Reporter: Juan Olivares We have a custom Kafka health check which creates a topic add some ACLs (read/write topic and group), produce & consume a single message and then quickly remove it and all the related ACLs created. We have observed that # of instances of {{kafka.log.Log}} keep growing, while there's no evidence of topics being leaked, neither running {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor looking at the disk directory where topics are stored. After looking at the heapdump we've observed the following - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big amount of {{kafka.log.Log}} instances. - The only reference preventing {{kafka.log.Log}} to be Garbage collected seems to be {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which contains schedule tasks created with the name {{PeriodicProducerExpirationCheck}}. I can see in the code that for every {{kafka.log.Log}} a task with this name is scheduled. {code:java} scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) {code} However it seems those tasks are never unscheduled/cancelled -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851913#comment-16851913 ] SuryaTeja Duggi commented on KAFKA-7995: [~sagarrao] Can i get you mail so that i can have some to chat and work with. > Augment singleton protocol type to list for Kafka Consumer > > > Key: KAFKA-7995 > URL: https://issues.apache.org/jira/browse/KAFKA-7995 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Reporter: Boyang Chen >Assignee: Sagar Rao >Priority: Major > Labels: newbie > > Right now Kafka consumer protocol uses a singleton marker to distinguish > Kafka Connect worker and normal consumer. This is not upgrade-friendly > approach since the protocol type could potential change over time. A better > approach is to support multiple candidacies so that the no downtime protocol > type switch could achieve. > For example, if we are trying to upgrade a Kafka Streams application towards > a protocol type called "stream", right now there is no way to do this without > downtime since broker will reject changing protocol type to a different one > unless the group is back to empty. If we allow new member to provide a list > of protocol type instead ("consumer", "stream"), there would be no > compatibility issue. > Alternative approach is to invent an admin API to change group's protocol > type on runtime. However, the burden introduced on administrator is not > trivial, since we need to guarantee the operation series to be correct, > otherwise we will see limp-upgrade experience in the midpoint, for example > while we are changing protocol type there was unexpected rebalance that > causes old members join failure. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4815) Idempotent/transactional Producer (KIP-98)
[ https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851627#comment-16851627 ] wenxuanguan commented on KAFKA-4815: how transaction support multiple producer instances. when multiple producer share the same txn id, throw the following exception: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606) at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. > Idempotent/transactional Producer (KIP-98) > -- > > Key: KAFKA-4815 > URL: https://issues.apache.org/jira/browse/KAFKA-4815 > Project: Kafka > Issue Type: New Feature > Components: clients, core, producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip > Fix For: 0.11.0.0 > > > This issue tracks implementation progress for KIP-98: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8447) New Metric to Measure Number of Tasks on a Connector
[ https://issues.apache.org/jira/browse/KAFKA-8447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cyrus Vafadari updated KAFKA-8447: -- Summary: New Metric to Measure Number of Tasks on a Connector (was: Add Connector-level metric to count tasks) > New Metric to Measure Number of Tasks on a Connector > > > Key: KAFKA-8447 > URL: https://issues.apache.org/jira/browse/KAFKA-8447 > Project: Kafka > Issue Type: New Feature >Reporter: Cyrus Vafadari >Priority: Major > > KIP-475 > Worker-level metrics expose number of tasks on a worker, but for many > applications it is useful to have metrics on how many tasks each connector > has. -- This message was sent by Atlassian JIRA (v7.6.3#76005)