[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-05-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852686#comment-16852686
 ] 

Matthias J. Sax commented on KAFKA-8377:


Not sure if I can follow. The decision to materialize or not, is made before we 
start the application, but while building the `Topology` (please correct my if 
I am wrong). However, `KTableValueGetterSupplier` is only used at runtime and 
cannot change the decision we made before any longer?

Or do you refer to `KTableImpl#valueGetterSupplier()` – maybe we could do the 
check there and force materialization of the parent KTable for this case, 
similar to forcing materialization of source KTables?

Btw: while digging into the code, I found that we have classes 
`KTableMaterializedValueGetterSupplier` and `KTableSourceValueGetterSupplier` 
that are basically the same – might be worth to remove one as side cleanup with 
this ticket.

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8446) Kafka Streams restoration crashes with NPE when the record value is null

2019-05-30 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8446:
---
Description: 
To add more context here:

a NPE will be triggered in *RecordConverters#ByteBuffer.allocate(8 + 
rawValue.length)* if the raw value with record is null. A scenario you could 
reproduce is like:
 # Having a source KTable (timestampedKeyValueStore) reads in tombstone records 
where the value field is null. Note that you need to turn off stream level 
cache, otherwise the null value will not be forwarded.
 # Shutdown the application, and clear local state stores.
 # Restart the application before changelog topic starts compaction. If we are 
"lucky", the tombstone record will trigger null pointer exception during 
restoration when we try to extract raw value length.

 

> Kafka Streams restoration crashes with NPE when the record value is null
> 
>
> Key: KAFKA-8446
> URL: https://issues.apache.org/jira/browse/KAFKA-8446
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
>
> To add more context here:
> a NPE will be triggered in *RecordConverters#ByteBuffer.allocate(8 + 
> rawValue.length)* if the raw value with record is null. A scenario you could 
> reproduce is like:
>  # Having a source KTable (timestampedKeyValueStore) reads in tombstone 
> records where the value field is null. Note that you need to turn off stream 
> level cache, otherwise the null value will not be forwarded.
>  # Shutdown the application, and clear local state stores.
>  # Restart the application before changelog topic starts compaction. If we 
> are "lucky", the tombstone record will trigger null pointer exception during 
> restoration when we try to extract raw value length.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852661#comment-16852661
 ] 

ASF GitHub Bot commented on KAFKA-8187:
---

hustclf commented on pull request #6849: KAFKA-8187 Bugfix for branch 2.0
URL: https://github.com/apache/kafka/pull/6849
 
 
   Fix KAFKA-8187: State store record loss across multiple reassignments when 
using standby tasks.
   
   - Do not let the thread to transit to RUNNING until all tasks (including 
standby tasks) are ready.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> 

[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-30 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852643#comment-16852643
 ] 

Richard Yu commented on KAFKA-8421:
---

Might try to take a hack at this one. It certainly is an interesting issue. :)

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-30 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639
 ] 

Boyang Chen edited comment on KAFKA-8438 at 5/31/19 4:20 AM:
-

Hey Richard, thanks for the update. I'm still not quite clear the purpose of 
adding this callback at the moment. Quoted from your original statement:

> When called, this method could be used to log a consumer failure or should 
> the user wish it, create a new thread which would then rejoin the consumer 
> group (which could also include the required {{group.instance.id}} so that a 
> rebalance wouldn't be re-triggered –- we would need to think about that). 
 I think logging a consumer failure is not that important, because either way 
user has to be aware of the failure, then this logging needs to talk to 
external service, generally a path from application -> local metrics agent -> 
remote time series DB -> pager system. It's not a very much saving of effort 
with this new callback.

Also for rebooting a new consumer, this consumer needs initialization configs 
and so on. Where are we planning to store these configuration data? If current 
consumer is already dead, who will be responsible for bringing up an 
almost-the-same consumer?

Could you brainstorm more use cases other than the error logging and rebooting 
here?


was (Author: bchen225242):
Hey Richard, thanks for the update. I'm still not quite clear the purpose of 
adding this callback at the moment. Quoted from your original statement:
```When called, this method could be used to log a consumer failure or should 
the user wish it, create a new thread which would then rejoin the consumer 
group (which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered –- we would need to think about that). ```
I think logging a consumer failure is not that important, because either way 
user has to be aware of the failure, then this logging needs to talk to 
external service, generally a path from application -> local metrics agent -> 
remote time series DB -> pager system. It's not a very much saving of effort 
with this new callback.

Also for rebooting a new consumer, this consumer needs initialization configs 
and so on. Where are we planning to store these configuration data? If current 
consumer is already dead, who will be responsible for bringing up an 
almost-the-same consumer?

Could you brainstorm more use cases other than the error logging and rebooting 
here?

> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
> thrown exception during processing, in which case, the workload will be 
> redistributed among surviving threads. Working to reduce rebalances due to 
> random consumer crashes, a recent change to Kafka internals had been made 
> (which introduces the concept of static membership) that prevents a rebalance 
> from occurring within {{session.timeout.ms}} in the hope that the consumer 
> thread which crashed would recover in that time interval and rejoin the group.
> However, in some cases, some consumer threads would permanently go down or 
> remain dead for long periods of time. In these scenarios, users of Kafka 
> would possibly not be aware of such a crash until hours later after it 
> happened which forces Kafka users to manually start a new KafkaConsumer 
> process a considerable period of time after the failure had occurred. That is 
> where the addition of a callback such as {{onConsumerFailure}} would help. 
> There are multiple use cases for this callback (which is defined by the 
> user). {{onConsumerFailure}} is called when a particular consumer thread goes 
> under for some specified time interval (i.e. a config called 
> {{acceptable.consumer.failure.timeout.ms}}). When called, this method could 
> be used to log a consumer failure or should the user wish it, create a new 
> thread which would then rejoin the consumer group (which could also include 
> the required {{group.instance.id}} so that a rebalance wouldn't be 
> re-triggered –- we would need to think about that). 
> Should the old thread recover and attempt to rejoin the consumer group (with 
> the substitute thread being part of the group), the old thread will be denied 
> access and an exception would be thrown (to 

[jira] [Comment Edited] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-30 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639
 ] 

Boyang Chen edited comment on KAFKA-8438 at 5/31/19 4:20 AM:
-

Hey Richard, thanks for the update. I'm still not quite clear the purpose of 
adding this callback at the moment. Quoted from your original statement:

{{When called, this method could be used to log a consumer failure or should 
the user wish it, create a new thread which would then rejoin the consumer 
group (which could also include the required }}{{group.instance.id}}{{ so that 
a rebalance wouldn't be re-triggered –- we would need to think about that). }}


 I think logging a consumer failure is not that important, because either way 
user has to be aware of the failure, then this logging needs to talk to 
external service, generally a path from application -> local metrics agent -> 
remote time series DB -> pager system. It's not a very much saving of effort 
with this new callback.

Also for rebooting a new consumer, this consumer needs initialization configs 
and so on. Where are we planning to store these configuration data? If current 
consumer is already dead, who will be responsible for bringing up an 
almost-the-same consumer?

Could you brainstorm more use cases other than the error logging and rebooting 
here?


was (Author: bchen225242):
Hey Richard, thanks for the update. I'm still not quite clear the purpose of 
adding this callback at the moment. Quoted from your original statement:

> When called, this method could be used to log a consumer failure or should 
> the user wish it, create a new thread which would then rejoin the consumer 
> group (which could also include the required {{group.instance.id}} so that a 
> rebalance wouldn't be re-triggered –- we would need to think about that). 
 I think logging a consumer failure is not that important, because either way 
user has to be aware of the failure, then this logging needs to talk to 
external service, generally a path from application -> local metrics agent -> 
remote time series DB -> pager system. It's not a very much saving of effort 
with this new callback.

Also for rebooting a new consumer, this consumer needs initialization configs 
and so on. Where are we planning to store these configuration data? If current 
consumer is already dead, who will be responsible for bringing up an 
almost-the-same consumer?

Could you brainstorm more use cases other than the error logging and rebooting 
here?

> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
> thrown exception during processing, in which case, the workload will be 
> redistributed among surviving threads. Working to reduce rebalances due to 
> random consumer crashes, a recent change to Kafka internals had been made 
> (which introduces the concept of static membership) that prevents a rebalance 
> from occurring within {{session.timeout.ms}} in the hope that the consumer 
> thread which crashed would recover in that time interval and rejoin the group.
> However, in some cases, some consumer threads would permanently go down or 
> remain dead for long periods of time. In these scenarios, users of Kafka 
> would possibly not be aware of such a crash until hours later after it 
> happened which forces Kafka users to manually start a new KafkaConsumer 
> process a considerable period of time after the failure had occurred. That is 
> where the addition of a callback such as {{onConsumerFailure}} would help. 
> There are multiple use cases for this callback (which is defined by the 
> user). {{onConsumerFailure}} is called when a particular consumer thread goes 
> under for some specified time interval (i.e. a config called 
> {{acceptable.consumer.failure.timeout.ms}}). When called, this method could 
> be used to log a consumer failure or should the user wish it, create a new 
> thread which would then rejoin the consumer group (which could also include 
> the required {{group.instance.id}} so that a rebalance wouldn't be 
> re-triggered –- we would need to think about that). 
> Should the old thread recover and attempt to rejoin the consumer group (with 
> the substitute thread being part of the group), the old thread will be denied 
> access and an exception would be thrown 

[jira] [Commented] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-30 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852639#comment-16852639
 ] 

Boyang Chen commented on KAFKA-8438:


Hey Richard, thanks for the update. I'm still not quite clear the purpose of 
adding this callback at the moment. Quoted from your original statement:
```When called, this method could be used to log a consumer failure or should 
the user wish it, create a new thread which would then rejoin the consumer 
group (which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered –- we would need to think about that). ```
I think logging a consumer failure is not that important, because either way 
user has to be aware of the failure, then this logging needs to talk to 
external service, generally a path from application -> local metrics agent -> 
remote time series DB -> pager system. It's not a very much saving of effort 
with this new callback.

Also for rebooting a new consumer, this consumer needs initialization configs 
and so on. Where are we planning to store these configuration data? If current 
consumer is already dead, who will be responsible for bringing up an 
almost-the-same consumer?

Could you brainstorm more use cases other than the error logging and rebooting 
here?

> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
> thrown exception during processing, in which case, the workload will be 
> redistributed among surviving threads. Working to reduce rebalances due to 
> random consumer crashes, a recent change to Kafka internals had been made 
> (which introduces the concept of static membership) that prevents a rebalance 
> from occurring within {{session.timeout.ms}} in the hope that the consumer 
> thread which crashed would recover in that time interval and rejoin the group.
> However, in some cases, some consumer threads would permanently go down or 
> remain dead for long periods of time. In these scenarios, users of Kafka 
> would possibly not be aware of such a crash until hours later after it 
> happened which forces Kafka users to manually start a new KafkaConsumer 
> process a considerable period of time after the failure had occurred. That is 
> where the addition of a callback such as {{onConsumerFailure}} would help. 
> There are multiple use cases for this callback (which is defined by the 
> user). {{onConsumerFailure}} is called when a particular consumer thread goes 
> under for some specified time interval (i.e. a config called 
> {{acceptable.consumer.failure.timeout.ms}}). When called, this method could 
> be used to log a consumer failure or should the user wish it, create a new 
> thread which would then rejoin the consumer group (which could also include 
> the required {{group.instance.id}} so that a rebalance wouldn't be 
> re-triggered –- we would need to think about that). 
> Should the old thread recover and attempt to rejoin the consumer group (with 
> the substitute thread being part of the group), the old thread will be denied 
> access and an exception would be thrown (to indicate that another process has 
> already taken its place).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8451) transaction support multiple producer instance

2019-05-30 Thread wenxuanguan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wenxuanguan updated KAFKA-8451:
---
Description: 
It's common in distributed system that multiple producer instances send message 
in one transaction concurrently, and the transaction is committed when all the 
producer send message successfully. Otherwise, if one producer failed,  the 
transaction is aborted and no message will be consumed.

However, when multiple producer share the same txn id, throw the following 
exception:

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state

at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)

at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)

at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)

at 
com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.

  was:
It's common in distributed system that multiple producer instances send message 
in one transaction concurrently, and if one producer failed, the other producer 
abort transaction.

However, when multiple producer share the same txn id, throw the following 
exception:

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state

at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)

at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)

at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)

at 
com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.


> transaction support multiple producer instance
> --
>
> Key: KAFKA-8451
> URL: https://issues.apache.org/jira/browse/KAFKA-8451
> Project: Kafka
>  Issue Type: Bug
>Reporter: wenxuanguan
>Priority: Major
>
> It's common in distributed system that multiple producer instances send 
> message in one transaction concurrently, and the transaction is committed 
> when all the producer send message successfully. Otherwise, if one producer 
> failed,  the transaction is aborted and no message will be consumed.
> However, when multiple producer share the same txn id, throw the following 
> exception:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
> at 
> com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852540#comment-16852540
 ] 

ASF GitHub Bot commented on KAFKA-8452:
---

vvcephei commented on pull request #6848: KAFKA-8452: Compressed BufferValue
URL: https://github.com/apache/kafka/pull/6848
 
 
   De-duplicate the common case in which the prior value is the same as the old 
value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible Suppress buffer optimization: de-duplicate prior value
> ---
>
> Key: KAFKA-8452
> URL: https://issues.apache.org/jira/browse/KAFKA-8452
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
> addition to the "old" and "new" values for each record, to support 
> transparent downstream views.
> In many cases, the prior value is actually the same as the old value, and we 
> could avoid storing it separately. The challenge is that the old and new 
> values are already serialized into a common array (as a Change via the 
> FullChangeSerde), so the "prior" value would actually be a slice on the 
> underlying array. But, of course, Java does not have array slices.
> To get around this, we either need to switch to ByteBuffers (which support 
> slices) or break apart the serialized Change into just serialized old and new 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-05-30 Thread John Roesler (JIRA)
John Roesler created KAFKA-8452:
---

 Summary: Possible Suppress buffer optimization: de-duplicate prior 
value
 Key: KAFKA-8452
 URL: https://issues.apache.org/jira/browse/KAFKA-8452
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
addition to the "old" and "new" values for each record, to support transparent 
downstream views.

In many cases, the prior value is actually the same as the old value, and we 
could avoid storing it separately. The challenge is that the old and new values 
are already serialized into a common array (as a Change via the 
FullChangeSerde), so the "prior" value would actually be a slice on the 
underlying array. But, of course, Java does not have array slices.

To get around this, we either need to switch to ByteBuffers (which support 
slices) or break apart the serialized Change into just serialized old and new 
values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8451) transaction support multiple producer instance

2019-05-30 Thread wenxuanguan (JIRA)
wenxuanguan created KAFKA-8451:
--

 Summary: transaction support multiple producer instance
 Key: KAFKA-8451
 URL: https://issues.apache.org/jira/browse/KAFKA-8451
 Project: Kafka
  Issue Type: Bug
Reporter: wenxuanguan


It's common in distributed system that multiple producer instances send message 
in one transaction concurrently, and if one producer failed, the other producer 
abort transaction.

However, when multiple producer share the same txn id, throw the following 
exception:

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state

at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)

at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)

at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)

at 
com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-05-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852478#comment-16852478
 ] 

Guozhang Wang commented on KAFKA-8377:
--

How about this:

Update `valueGetterSupplier` to special handle `KTableTransformValues` such that

1. If transformValues(..) takes non-empty `stateStoreNames` (we need to 
book-keep this flag somewhere), then always materialize the result.
2. Otherwise, still delegate to the parent getters.

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852473#comment-16852473
 ] 

ASF GitHub Bot commented on KAFKA-8448:
---

jolshan commented on pull request #6847: KAFKA-8448: Too many kafka.log.Log 
instances (Memory Leak)
URL: https://github.com/apache/kafka/pull/6847
 
 
   Added a method to delete the PeriodicProducerExpirationCheck task that kept 
adding to the log.
   Includes a test to show that the task exists and then is removed when the 
log is closed.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many kafka.log.Log instances (Memory Leak)
> --
>
> Key: KAFKA-8448
> URL: https://issues.apache.org/jira/browse/KAFKA-8448
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
> Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
> kafka_2.12-2.2.0
>Reporter: Juan Olivares
>Assignee: Justine Olshan
>Priority: Major
>
> We have a custom Kafka health check which creates a topic, add some ACLs 
> (read/write topic and group), produce & consume a single message and then 
> quickly remove it and all the related ACLs created. We close the consumer 
> involved, but no the producer.
> We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
> there's no evidence of topics being leaked, neither running 
> {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , 
> nor looking at the disk directory where topics are stored.
> After looking at the heapdump we've observed the following
>  - None of the {{kafka.log.Log}} references ({{currentLogs}}, 
> {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is 
> holding the big amount of {{kafka.log.Log}} instances.
>  - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
> seems to be 
> {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
> contains schedule tasks created with the name 
> {{PeriodicProducerExpirationCheck}}.
> I can see in the code that for every {{kafka.log.Log}} a task with this name 
> is scheduled.
> {code:java}
>   scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
> lock synchronized {
>   producerStateManager.removeExpiredProducers(time.milliseconds)
> }
>   }, period = producerIdExpirationCheckIntervalMs, delay = 
> producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
> {code}
> However it seems those tasks are never unscheduled/cancelled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8446) Kafka Streams restoration crashes with NPE when the record value is null

2019-05-30 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8446:
---
Summary: Kafka Streams restoration crashes with NPE when the record value 
is null  (was: KStream restoration will crash with NPE when the record value is 
null)

> Kafka Streams restoration crashes with NPE when the record value is null
> 
>
> Key: KAFKA-8446
> URL: https://issues.apache.org/jira/browse/KAFKA-8446
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-05-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852453#comment-16852453
 ] 

Matthias J. Sax commented on KAFKA-8377:


If we decide to materialize the upstream store for this case (and I cannot 
think of any other fix atm \cc [~guozhang], [~vvcephei]) it should not be too 
hard. We need to detect the case on which we need to materialize, and force a 
materialization. From my current understanding, the fix would go into the 
translation/optimization layer that translates the logical topology in the 
physical Topology (\cc [~bbejeck] to confirm if this makes sense).

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2019-05-30 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852428#comment-16852428
 ] 

Richard Yu commented on KAFKA-3539:
---

[~radai] I don't know if this problem is resolved or not. Issue 6705 was closed 
because it was thought any changes made would be too complex to fix this 
behavioral issue.

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-05-30 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851300#comment-16851300
 ] 

Richard Yu edited comment on KAFKA-8377 at 5/30/19 10:45 PM:
-

cc [~mjsax] How hard would it be to fix this bug?


was (Author: yohan123):
cc [~mjsax] How hard would this bug be when tackled?

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8199) ClassCastException when trying to groupBy after suppress

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852422#comment-16852422
 ] 

ASF GitHub Bot commented on KAFKA-8199:
---

guozhangwang commented on pull request #6781: KAFKA-8199: Implement ValueGetter 
for Suppress
URL: https://github.com/apache/kafka/pull/6781
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ClassCastException when trying to groupBy after suppress
> 
>
> Key: KAFKA-8199
> URL: https://issues.apache.org/jira/browse/KAFKA-8199
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Jose Lopez
>Priority: Major
> Fix For: 2.3.0
>
>
> A topology with a groupBy after a suppress operation results in a 
> ClassCastException
>  The following sample topology
> {noformat}
> Properties properties = new Properties(); 
> properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); 
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost");
> StreamsBuilder builder = new StreamsBuilder();
>  builder.stream("topic")
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() 
> .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), 
> BufferConfig.unbounded())) 
> .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); 
> builder.build(properties);
> {noformat}
> results in this exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 
> cannot be cast to 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8450) Augment processed in MockProcessor as KeyValueAndTimestamp

2019-05-30 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8450:


 Summary: Augment processed in MockProcessor as KeyValueAndTimestamp
 Key: KAFKA-8450
 URL: https://issues.apache.org/jira/browse/KAFKA-8450
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Guozhang Wang


Today the book-keeping list of `processed` records in MockProcessor is in the 
form of String, in which we just call the key / value type's toString function 
in order to book-keep, this loses the type information as well as not having 
timestamp associated with it.

It's better to translate its type to `KeyValueAndTimestamp` and refactor 
impacted unit tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852397#comment-16852397
 ] 

ASF GitHub Bot commented on KAFKA-8389:
---

guozhangwang commented on pull request #6761: KAFKA-8389: Remove redundant 
bookkeeping from MockProcessor
URL: https://github.com/apache/kafka/pull/6761
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
> --
>
> Key: KAFKA-8389
> URL: https://issues.apache.org/jira/browse/KAFKA-8389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, 
> yet we have those in TopologyTestDriverTest as well. We should consider 
> removing them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852379#comment-16852379
 ] 

ASF GitHub Bot commented on KAFKA-5998:
---

bbejeck commented on pull request #6846: KAFKA-5998: Create state dir and 
checkpoint file if it doesn't exist when checkpointing
URL: https://github.com/apache/kafka/pull/6846
 
 
   There can exist a condition where the state directory for a task gets 
deleted by the cleanup thread, but the directory is still needed for the given 
task for checkpointing.  
   
   This PR will catch the `FileNotFoundException` and create the required task 
directory so the checkpoint file can be created.
   
   Tests have been updated to validate the task directory and checkpoint file 
are created when a `FileNotFoundException` is thrown.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 

[jira] [Resolved] (KAFKA-8441) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-05-30 Thread Bruno Cadonna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8441.
--
Resolution: Invalid

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8441
> URL: https://issues.apache.org/jira/browse/KAFKA-8441
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Topics not 
> deleted after 3 milli seconds.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:265)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAndRecreateTopics(EmbeddedKafkaCluster.java:288)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.setUp(RegexSourceIntegrationTest.java:118)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>   at 

[jira] [Resolved] (KAFKA-8440) Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows

2019-05-30 Thread Bruno Cadonna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8440.
--
Resolution: Invalid

> Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows
> ---
>
> Key: KAFKA-8440
> URL: https://issues.apache.org/jira/browse/KAFKA-8440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: 
> Expected: 
>  but: was null
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldReduceSessionWindows(KStreamAggregationIntegrationTest.java:663)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> 

[jira] [Commented] (KAFKA-8155) Update Streams system tests for 2.2.0 and 2.1.1 releases

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852283#comment-16852283
 ] 

ASF GitHub Bot commented on KAFKA-8155:
---

mjsax commented on pull request #6596:  KAFKA-8155: Add 2.1.1 release to system 
tests
URL: https://github.com/apache/kafka/pull/6596
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update Streams system tests for 2.2.0 and 2.1.1 releases
> 
>
> Key: KAFKA-8155
> URL: https://issues.apache.org/jira/browse/KAFKA-8155
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: John Roesler
>Assignee: Matthias J. Sax
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-30 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852279#comment-16852279
 ] 

Andrew Olson commented on KAFKA-8421:
-

Agree this would be a potentially quite beneficial enhancement.

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-30 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-5998:
--

Assignee: Bill Bejeck

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  

[jira] [Resolved] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-30 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8418.
--
Resolution: Fixed
  Reviewer: Randall Hauch

Merged and backported all the way to `1.0` branch.

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> I am getting an error while running Kafka tests:
> {code}
> Traceback (most recent call last): File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 132, in run data = self.run_test() File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 189, in run_test return self.test_context.function(self.test) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
>  line 89, in test_rest_api assert set([connector_plugin['class'] for 
> connector_plugin in self.cc.list_connector_plugins()]) == 
> \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
> node=node) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
> resp.url) ConnectRestError
> {code}
> From the logs, I see two messages:
> {code}
> [2019-05-23 16:09:59,373] INFO REST server listening at 
> http://172.31.39.205:8083/, advertising URL http://worker1:8083/ 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
> and {code}
> [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started 
> and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
>  it takes 1365 ms to actually load REST resources, but the test is waiting on 
> a port to be listening. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-30 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-8418:
-
Fix Version/s: 2.2.1
   2.1.2
   2.0.2
   1.1.2
   1.0.3

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> I am getting an error while running Kafka tests:
> {code}
> Traceback (most recent call last): File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 132, in run data = self.run_test() File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 189, in run_test return self.test_context.function(self.test) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
>  line 89, in test_rest_api assert set([connector_plugin['class'] for 
> connector_plugin in self.cc.list_connector_plugins()]) == 
> \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
> node=node) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
> resp.url) ConnectRestError
> {code}
> From the logs, I see two messages:
> {code}
> [2019-05-23 16:09:59,373] INFO REST server listening at 
> http://172.31.39.205:8083/, advertising URL http://worker1:8083/ 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
> and {code}
> [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started 
> and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
>  it takes 1365 ms to actually load REST resources, but the test is waiting on 
> a port to be listening. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852247#comment-16852247
 ] 

ASF GitHub Bot commented on KAFKA-8418:
---

rhauch commented on pull request #6840: KAFKA-8418: Wait until REST resources 
are loaded when starting a Connect Worker.
URL: https://github.com/apache/kafka/pull/6840
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.3.0
>
>
> I am getting an error while running Kafka tests:
> {code}
> Traceback (most recent call last): File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 132, in run data = self.run_test() File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 189, in run_test return self.test_context.function(self.test) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
>  line 89, in test_rest_api assert set([connector_plugin['class'] for 
> connector_plugin in self.cc.list_connector_plugins()]) == 
> \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
> node=node) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
> resp.url) ConnectRestError
> {code}
> From the logs, I see two messages:
> {code}
> [2019-05-23 16:09:59,373] INFO REST server listening at 
> http://172.31.39.205:8083/, advertising URL http://worker1:8083/ 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
> and {code}
> [2019-05-23 16:10:00,738] INFO REST resources initialized; server is started 
> and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {code}
>  it takes 1365 ms to actually load REST resources, but the test is waiting on 
> a port to be listening. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8293) Messages undelivered when small quotas applied

2019-05-30 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852160#comment-16852160
 ] 

Rens Groothuijsen commented on KAFKA-8293:
--

I haven't been able to replicate this issue with a local single-node 
configuration. Are you sending these messages over a network, or is it entirely 
local?

> Messages undelivered when small quotas applied 
> ---
>
> Key: KAFKA-8293
> URL: https://issues.apache.org/jira/browse/KAFKA-8293
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Kirill Kulikov
>Priority: Major
>  Labels: quotas
>
> I observe a strange Kafka behavior when using small quotas.
> For ex. I set quotas for the consumer like
>  
> {code:java}
> kafka-configs --zookeeper zookeeper:2181 --entity-type users --entity-name 
> kafka --alter --add-config 'producer_byte_rate=2048000, 
> consumer_byte_rate=256'{code}
>  
> If I send a small batch of messages as
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 10 --record-size 20 --throughput 1000 --print-metrics --topic 
> test 
> {code}
> they go through without problems.
> But when the batch is bigger
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 100 --record-size 20 --throughput 1000 --print-metrics --topic 
> test
> {code}
> ... I do not get any messages on the consumer side *at all*.
> On the other hand, if `kafka-producer-perf-test` throughput is limited like
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 1000 --record-size 10 --throughput 10 --print-metrics --topic 
> test
> {code}
> I can see only the first 20-30 messages in `kafka-console-consumer`. But then 
> it gets stuck (throttled perhaps) and other queued messages never come 
> through.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-30 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852127#comment-16852127
 ] 

Guozhang Wang commented on KAFKA-8367:
--

[~pavelsavov] Could you share the code of transformer / processor that access 
the `(stores: [performance_windowed_store])` as well?

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-30 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8187.

Resolution: Fixed

merged to trunk and cherry-picked to 2.3, 2.2 and 2.1

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will get cleaned 
> up by the stateDirCleaner thread 10 to 20 minutes later and the record loss 
> issue will be hidden.
> [0] 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
> [1] 
> 

[jira] [Updated] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-30 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8187:
---
Fix Version/s: 2.2.1
   2.1.2
   2.3.0

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will get cleaned 
> up by the stateDirCleaner thread 10 to 20 minutes later and the record loss 
> issue will be hidden.
> [0] 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
> [1] 
> 

[jira] [Commented] (KAFKA-6455) Improve timestamp propagation at DSL level

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852060#comment-16852060
 ] 

ASF GitHub Bot commented on KAFKA-6455:
---

mjsax commented on pull request #6751: KAFKA-6455: Update integration tests to 
verify result timestamps
URL: https://github.com/apache/kafka/pull/6751
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve timestamp propagation at DSL level
> --
>
> Key: KAFKA-6455
> URL: https://issues.apache.org/jira/browse/KAFKA-6455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.3.0
>
>
> At DSL level, we inherit the timestamp propagation "contract" from the 
> Processor API. This contract in not optimal at DSL level, and we should 
> define a DSL level contract that matches the semantics of the corresponding 
> DSL operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852041#comment-16852041
 ] 

Ismael Juma commented on KAFKA-8448:


Nice find, thanks for the report.

> Too many kafka.log.Log instances (Memory Leak)
> --
>
> Key: KAFKA-8448
> URL: https://issues.apache.org/jira/browse/KAFKA-8448
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
> Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
> kafka_2.12-2.2.0
>Reporter: Juan Olivares
>Assignee: Justine Olshan
>Priority: Major
>
> We have a custom Kafka health check which creates a topic, add some ACLs 
> (read/write topic and group), produce & consume a single message and then 
> quickly remove it and all the related ACLs created. We close the consumer 
> involved, but no the producer.
> We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
> there's no evidence of topics being leaked, neither running 
> {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , 
> nor looking at the disk directory where topics are stored.
> After looking at the heapdump we've observed the following
>  - None of the {{kafka.log.Log}} references ({{currentLogs}}, 
> {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is 
> holding the big amount of {{kafka.log.Log}} instances.
>  - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
> seems to be 
> {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
> contains schedule tasks created with the name 
> {{PeriodicProducerExpirationCheck}}.
> I can see in the code that for every {{kafka.log.Log}} a task with this name 
> is scheduled.
> {code:java}
>   scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
> lock synchronized {
>   producerStateManager.removeExpiredProducers(time.milliseconds)
> }
>   }, period = producerIdExpirationCheckIntervalMs, delay = 
> producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
> {code}
> However it seems those tasks are never unscheduled/cancelled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread Justine Olshan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-8448:
-

Assignee: Justine Olshan

> Too many kafka.log.Log instances (Memory Leak)
> --
>
> Key: KAFKA-8448
> URL: https://issues.apache.org/jira/browse/KAFKA-8448
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
> Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
> kafka_2.12-2.2.0
>Reporter: Juan Olivares
>Assignee: Justine Olshan
>Priority: Major
>
> We have a custom Kafka health check which creates a topic, add some ACLs 
> (read/write topic and group), produce & consume a single message and then 
> quickly remove it and all the related ACLs created. We close the consumer 
> involved, but no the producer.
> We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
> there's no evidence of topics being leaked, neither running 
> {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , 
> nor looking at the disk directory where topics are stored.
> After looking at the heapdump we've observed the following
>  - None of the {{kafka.log.Log}} references ({{currentLogs}}, 
> {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is 
> holding the big amount of {{kafka.log.Log}} instances.
>  - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
> seems to be 
> {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
> contains schedule tasks created with the name 
> {{PeriodicProducerExpirationCheck}}.
> I can see in the code that for every {{kafka.log.Log}} a task with this name 
> is scheduled.
> {code:java}
>   scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
> lock synchronized {
>   producerStateManager.removeExpiredProducers(time.milliseconds)
> }
>   }, period = producerIdExpirationCheckIntervalMs, delay = 
> producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
> {code}
> However it seems those tasks are never unscheduled/cancelled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8429) Consumer should handle offset change while OffsetForLeaderEpoch is inflight

2019-05-30 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8429.

   Resolution: Fixed
Fix Version/s: (was: 2.3.0)

> Consumer should handle offset change while OffsetForLeaderEpoch is inflight
> ---
>
> Key: KAFKA-8429
> URL: https://issues.apache.org/jira/browse/KAFKA-8429
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> It is possible for the offset of a partition to be changed while we are in 
> the middle of validation. If the OffsetForLeaderEpoch request is in-flight 
> and the offset changes, we need to redo the validation after it returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8429) Consumer should handle offset change while OffsetForLeaderEpoch is inflight

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852015#comment-16852015
 ] 

ASF GitHub Bot commented on KAFKA-8429:
---

hachikuji commented on pull request #6811: KAFKA-8429; Handle offset change 
when OffsetForLeaderEpoch inflight
URL: https://github.com/apache/kafka/pull/6811
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer should handle offset change while OffsetForLeaderEpoch is inflight
> ---
>
> Key: KAFKA-8429
> URL: https://issues.apache.org/jira/browse/KAFKA-8429
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> It is possible for the offset of a partition to be changed while we are in 
> the middle of validation. If the OffsetForLeaderEpoch request is in-flight 
> and the offset changes, we need to redo the validation after it returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851996#comment-16851996
 ] 

ASF GitHub Bot commented on KAFKA-8187:
---

bbejeck commented on pull request #6818: KAFKA-8187: Add wait time for other 
thread in the same jvm to free the locks
URL: https://github.com/apache/kafka/pull/6818
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the 

[jira] [Created] (KAFKA-8449) Restart task on reconfiguration under incremental cooperative rebalancing

2019-05-30 Thread Konstantine Karantasis (JIRA)
Konstantine Karantasis created KAFKA-8449:
-

 Summary: Restart task on reconfiguration under incremental 
cooperative rebalancing
 Key: KAFKA-8449
 URL: https://issues.apache.org/jira/browse/KAFKA-8449
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis
 Fix For: 2.3.0


Tasks that are already running and are not redistributed are not currently 
restarted under incremental cooperative rebalancing when their configuration 
changes. With eager rebalancing the restart was triggered and therefore implied 
by rebalancing itself. But now existing tasks will not read the new 
configuration unless restarted via the REST api. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread Juan Olivares (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juan Olivares updated KAFKA-8448:
-
Description: 
We have a custom Kafka health check which creates a topic, add some ACLs 
(read/write topic and group), produce & consume a single message and then 
quickly remove it and all the related ACLs created. We close the consumer 
involved, but no the producer.

We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
there's no evidence of topics being leaked, neither running 
{{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor 
looking at the disk directory where topics are stored.

After looking at the heapdump we've observed the following
 - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted 
}} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big 
amount of {{kafka.log.Log}} instances.
 - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
seems to be 
{{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
contains schedule tasks created with the name 
{{PeriodicProducerExpirationCheck}}.

I can see in the code that for every {{kafka.log.Log}} a task with this name is 
scheduled.
{code:java}
  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
  producerStateManager.removeExpiredProducers(time.milliseconds)
}
  }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
{code}

However it seems those tasks are never unscheduled/cancelled

  was:
We have a custom Kafka health check which creates a topic, add some ACLs 
(read/write topic and group), produce & consume a single message and then 
quickly remove it and all the related ACLs created.

We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
there's no evidence of topics being leaked, neither running 
{{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor 
looking at the disk directory where topics are stored.

After looking at the heapdump we've observed the following
 - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted 
}} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big 
amount of {{kafka.log.Log}} instances.
 - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
seems to be 
{{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
contains schedule tasks created with the name 
{{PeriodicProducerExpirationCheck}}.

I can see in the code that for every {{kafka.log.Log}} a task with this name is 
scheduled.
{code:java}
  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
  producerStateManager.removeExpiredProducers(time.milliseconds)
}
  }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
{code}

However it seems those tasks are never unscheduled/cancelled


> Too many kafka.log.Log instances (Memory Leak)
> --
>
> Key: KAFKA-8448
> URL: https://issues.apache.org/jira/browse/KAFKA-8448
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
> Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
> kafka_2.12-2.2.0
>Reporter: Juan Olivares
>Priority: Major
>
> We have a custom Kafka health check which creates a topic, add some ACLs 
> (read/write topic and group), produce & consume a single message and then 
> quickly remove it and all the related ACLs created. We close the consumer 
> involved, but no the producer.
> We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
> there's no evidence of topics being leaked, neither running 
> {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , 
> nor looking at the disk directory where topics are stored.
> After looking at the heapdump we've observed the following
>  - None of the {{kafka.log.Log}} references ({{currentLogs}}, 
> {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is 
> holding the big amount of {{kafka.log.Log}} instances.
>  - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
> seems to be 
> {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
> contains schedule tasks created with the name 
> {{PeriodicProducerExpirationCheck}}.
> I can see in the code that for every {{kafka.log.Log}} a task with this name 
> is scheduled.
> {code:java}
>   scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
> lock synchronized {
>   

[jira] [Updated] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread Juan Olivares (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juan Olivares updated KAFKA-8448:
-
Description: 
We have a custom Kafka health check which creates a topic, add some ACLs 
(read/write topic and group), produce & consume a single message and then 
quickly remove it and all the related ACLs created.

We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
there's no evidence of topics being leaked, neither running 
{{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor 
looking at the disk directory where topics are stored.

After looking at the heapdump we've observed the following
 - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted 
}} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big 
amount of {{kafka.log.Log}} instances.
 - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
seems to be 
{{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
contains schedule tasks created with the name 
{{PeriodicProducerExpirationCheck}}.

I can see in the code that for every {{kafka.log.Log}} a task with this name is 
scheduled.
{code:java}
  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
  producerStateManager.removeExpiredProducers(time.milliseconds)
}
  }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
{code}

However it seems those tasks are never unscheduled/cancelled

  was:
We have a custom Kafka health check which creates a topic add some ACLs 
(read/write topic and group), produce & consume a single message and then 
quickly remove it and all the related ACLs created.

We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
there's no evidence of topics being leaked, neither running 
{{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor 
looking at the disk directory where topics are stored.

After looking at the heapdump we've observed the following
 - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted 
}} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big 
amount of {{kafka.log.Log}} instances.
 - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
seems to be 
{{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
contains schedule tasks created with the name 
{{PeriodicProducerExpirationCheck}}.

I can see in the code that for every {{kafka.log.Log}} a task with this name is 
scheduled.
{code:java}
  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
  producerStateManager.removeExpiredProducers(time.milliseconds)
}
  }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
{code}

However it seems those tasks are never unscheduled/cancelled


> Too many kafka.log.Log instances (Memory Leak)
> --
>
> Key: KAFKA-8448
> URL: https://issues.apache.org/jira/browse/KAFKA-8448
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
> Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
> kafka_2.12-2.2.0
>Reporter: Juan Olivares
>Priority: Major
>
> We have a custom Kafka health check which creates a topic, add some ACLs 
> (read/write topic and group), produce & consume a single message and then 
> quickly remove it and all the related ACLs created.
> We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
> there's no evidence of topics being leaked, neither running 
> {{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , 
> nor looking at the disk directory where topics are stored.
> After looking at the heapdump we've observed the following
>  - None of the {{kafka.log.Log}} references ({{currentLogs}}, 
> {{logsToBeDeleted }} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is 
> holding the big amount of {{kafka.log.Log}} instances.
>  - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
> seems to be 
> {{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
> contains schedule tasks created with the name 
> {{PeriodicProducerExpirationCheck}}.
> I can see in the code that for every {{kafka.log.Log}} a task with this name 
> is scheduled.
> {code:java}
>   scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
> lock synchronized {
>   producerStateManager.removeExpiredProducers(time.milliseconds)
> }
>   }, period = producerIdExpirationCheckIntervalMs, delay = 
> producerIdExpirationCheckIntervalMs, 

[jira] [Created] (KAFKA-8448) Too many kafka.log.Log instances (Memory Leak)

2019-05-30 Thread Juan Olivares (JIRA)
Juan Olivares created KAFKA-8448:


 Summary: Too many kafka.log.Log instances (Memory Leak)
 Key: KAFKA-8448
 URL: https://issues.apache.org/jira/browse/KAFKA-8448
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.0
 Environment: Red Hat 4.4.7-16, java version "1.8.0_152", 
kafka_2.12-2.2.0
Reporter: Juan Olivares


We have a custom Kafka health check which creates a topic add some ACLs 
(read/write topic and group), produce & consume a single message and then 
quickly remove it and all the related ACLs created.

We have observed that # of instances of {{kafka.log.Log}} keep growing, while 
there's no evidence of topics being leaked, neither running 
{{/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe}} , nor 
looking at the disk directory where topics are stored.

After looking at the heapdump we've observed the following
 - None of the {{kafka.log.Log}} references ({{currentLogs}}, {{logsToBeDeleted 
}} and {{logsToBeDeleted}}) in {{kafka.log.LogManager}} is holding the big 
amount of {{kafka.log.Log}} instances.
 - The only reference preventing {{kafka.log.Log}} to be Garbage collected 
seems to be 
{{java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue}} which 
contains schedule tasks created with the name 
{{PeriodicProducerExpirationCheck}}.

I can see in the code that for every {{kafka.log.Log}} a task with this name is 
scheduled.
{code:java}
  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
  producerStateManager.removeExpiredProducers(time.milliseconds)
}
  }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
{code}

However it seems those tasks are never unscheduled/cancelled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-05-30 Thread SuryaTeja Duggi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851913#comment-16851913
 ] 

SuryaTeja Duggi commented on KAFKA-7995:


[~sagarrao] Can i get you mail so that i can have some to chat and work with. 

> Augment singleton protocol type to list for Kafka Consumer  
> 
>
> Key: KAFKA-7995
> URL: https://issues.apache.org/jira/browse/KAFKA-7995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Boyang Chen
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
>
> Right now Kafka consumer protocol uses a singleton marker to distinguish 
> Kafka Connect worker and normal consumer. This is not upgrade-friendly 
> approach since the protocol type could potential change over time. A better 
> approach is to support multiple candidacies so that the no downtime protocol 
> type switch could achieve.
> For example, if we are trying to upgrade a Kafka Streams application towards 
> a protocol type called "stream", right now there is no way to do this without 
> downtime since broker will reject changing protocol type to a different one 
> unless the group is back to empty. If we allow new member to provide a list 
> of protocol type instead ("consumer", "stream"), there would be no 
> compatibility issue.
> Alternative approach is to invent an admin API to change group's protocol 
> type on runtime. However, the burden introduced on administrator is not 
> trivial, since we need to guarantee the operation series to be correct, 
> otherwise we will see limp-upgrade experience in the midpoint, for example 
> while we are changing protocol type there was unexpected rebalance that 
> causes old members join failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4815) Idempotent/transactional Producer (KIP-98)

2019-05-30 Thread wenxuanguan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851627#comment-16851627
 ] 

wenxuanguan commented on KAFKA-4815:


how transaction support multiple producer instances.
when multiple producer share the same txn id, throw the following exception:
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
at 
com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.

> Idempotent/transactional Producer (KIP-98)
> --
>
> Key: KAFKA-4815
> URL: https://issues.apache.org/jira/browse/KAFKA-4815
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This issue tracks implementation progress for KIP-98: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8447) New Metric to Measure Number of Tasks on a Connector

2019-05-30 Thread Cyrus Vafadari (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cyrus Vafadari updated KAFKA-8447:
--
Summary: New Metric to Measure Number of Tasks on a Connector  (was: Add 
Connector-level metric to count tasks)

> New Metric to Measure Number of Tasks on a Connector
> 
>
> Key: KAFKA-8447
> URL: https://issues.apache.org/jira/browse/KAFKA-8447
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cyrus Vafadari
>Priority: Major
>
> KIP-475
> Worker-level metrics expose number of tasks on a worker, but for many 
> applications it is useful to have metrics on how many tasks each connector 
> has.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)