[jira] [Commented] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2021-04-09 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17318130#comment-17318130
 ] 

Di Campo commented on KAFKA-8165:
-

Thanks A. Sophie. 
Yes, it was OK to close the ticket. 


> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window 
> store, 4 KV stores. 
> Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
> application instance, 2 threads per instance.
> Kafka 2.1, Kafka Streams 2.1
> Amazon Linux.
> Scala application, on Docker based on openJdk9. 
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> {code:java}
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code}
>  
>  ... one for each different KV store I have (I still have another KV that 
> does not appear, and a WindowedStore store that also does not appear). 
>  Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> {code:java}
> java.lang.OutOfMemoryError: GC overhead limit exceeded{code}
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-08 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153365#comment-17153365
 ] 

Di Campo edited comment on KAFKA-4273 at 7/8/20, 8:56 AM:
--

Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

To implement some kind of TTL in KV, I suppose you can use punctuation to 
perform store deletion inside the scheduled punctuation. However that would 
make for a potentially long removal at a certain point in time. (BTW, does 
Punctuator execution stop the processing? In that case it would be dangerous to 
wait too long to do it ).

All in all, TTL in KV stores would be a good if not perfect fit for this case :)


was (Author: xmar):
Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

TTL in KV stores would be a good if not perfect fit for this case :)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-08 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153365#comment-17153365
 ] 

Di Campo commented on KAFKA-4273:
-

Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

TTL in KV stores would be a good if not perfect fit for this case :)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:28 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 


def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could 

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could 

[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo commented on KAFKA-4273:
-


Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}




was (Author: xmar):

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". 

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-06-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801
 ] 

Di Campo edited comment on KAFKA-4273 at 6/30/20, 4:12 PM:
---

Any news or improved workaround? I am also interested on this. 
 In my case, I think my preferred workaround should be the one explained 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]


was (Author: xmar):
Any news or improved workaround? I am also interested on this. 
In my case, I think my preferred workaround should be the one explained 
[[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-06-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801
 ] 

Di Campo commented on KAFKA-4273:
-

Any news or improved workaround? I am also interested on this. 
In my case, I think my preferred workaround should be the one explained 
[[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2019-09-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16940802#comment-16940802
 ] 

Di Campo commented on KAFKA-8770:
-

This comes from the following use case:

Relation between pageview and session are sent down to a separate topic  (i.e. 
to which session this pageview corresponds to, as a {{(pageviewId, sessionId)}} 
). As more pageviews are added to the session, they are sent to (in a 
{{sessionStore.toStream.flatMap}} fashion, where pageview ids are kept in the 
aggregator).
The relation of a pageview with its session may change, whether because of a 
session merge, a session cut by custom logic, etc.
But most times it is the same sessionId value, so I want to have a first value 
rightaway to have near-realtime associations, but lower the traffic that is 
sent by unnecessary updates.

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2019-07-24 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891715#comment-16891715
 ] 

Di Campo commented on KAFKA-8608:
-

Hi Lillian,

I'm afraid I can't, If it happens again, which context do you need? Full broker 
logs for that amount of time? 

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-07-24 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891712#comment-16891712
 ] 

Di Campo commented on KAFKA-8612:
-

I've upgraded to 2.3 in testing environment and for a week it hasn't happened. 
Although this was not something that happened often either, I'd say once per 2 
weeks maybe. Will update with results.

> Broker removes consumers from CG, Streams app gets stuck
> 
>
> Key: KAFKA-8612
> URL: https://issues.apache.org/jira/browse/KAFKA-8612
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.1.1
>Reporter: Di Campo
>Priority: Major
>  Labels: broker, streams, timeout
> Attachments: full-thread-dump-kafka-streams-stuck.log
>
>
> Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
> Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
> threads each. `2.1.0` 
> Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
> 19.5.3.8`), with several tables consuming from a different topic each.
> The `stream-processor` is running consuming from a source topic and running a 
> topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
> sessioned, 4 key-value.
> Infra running on docker on AWS ECS. 
> Consuming at a rate of 300-1000 events per second. Each event generates an 
> avg of ~20 extra messages.
> Application has uncaughtExceptionHandler set.
> Timestamps are kept for better analysis.
> `stream-processor` tasks at some point fail to produce to any partition due 
> to timeouts:
> 
> {noformat}
> [2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to 
> topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 
> 44 record(s) for (...)-48:120002 ms has passed since batch creation; No more 
> records will be sent and no more offsets will be recorded for this task.
> {noformat}
> and "Offset commit failed" errors, in all partitions:
> {noformat}
> [2019-06-28 10:04:27,705] ERROR [Consumer 
> clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
>  groupId=stream-processor-0.0.1] Offset commit failed on partition 
> events-raw-63 at offset 4858803: The request timed out. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {noformat}
> _At this point we begin seeing error messages in one of the brokers (see 
> below, Broker logs section)._
> More error messages are shown on the `stream-processor`: 
> {noformat}
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before successfully committing offsets 
> {(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
> {noformat}
> then hundreds of messages of the following types (one per topic-partitio) 
> intertwinned: 
> {noformat}
> [2019-06-28 10:05:23,608] WARN [Producer 
> clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
>  Got error produce response with correlation id 39946 on topic-partition 
> (topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
> (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}
> {noformat}
> [2019-06-28 10:05:23,609] WARN [Producer 
> clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
>  Received invalid metadata error in produce request on partition (topic)1-59 
> due to org.apache.kafka.common.errors.NetworkException: The server 
> disconnected before a response was received.. Going to request metadata 
> update now (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}
> And then: 
> {noformat}
> [2019-06-28 10:05:47,986] ERROR stream-thread 
> [stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
> Failed to commit stream task 1_57 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
> 2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
> [1_57] Abort sending since an error caught with a previous record (...) to 
> topic (...) due to org.apache.kafka.common.errors.NetworkException: The 
> server disconnected before a response was received.
> 2019-06-28 10:05:47You can increase producer parameter `retries` and 
> `retry.backoff.ms` to avoid this error.
> 2019-06-28 10:05:47 at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
> {noformat}
> ...and eventually we get to the error messages: 
> {noformat}
> [2019-06-28 10:05:51,198] ERROR [Producer 
> clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
>  Uncaught error in kafka producer I/O thread: 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-08 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880482#comment-16880482
 ] 

Di Campo commented on KAFKA-5998:
-

Thanks Patrik.
So, if for some reason an application is stopped for longer than 10 minutes, 
does it mean that on restart, this cleaner may clean that previous state, or is 
this prevented?

Also - if a task is moved to a partition - is the state copied from the 
instances where that task was(if there was any), or is it rebuilt reading from 
the Kafka topics?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-04 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878674#comment-16878674
 ] 

Di Campo commented on KAFKA-5998:
-

My topology contains stateless subtopologies, so it may be that case. 

One question: 

> The "trigger" for the cleanup is the absence of any traffic on a topic 
> partition which will allow the directory to be deleted

 Can you elaborate on this trigger? What will be cleaned in this cleanup 
process? How long does it wait by default for traffic to determine absence?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> 

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-04 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878674#comment-16878674
 ] 

Di Campo edited comment on KAFKA-5998 at 7/4/19 1:55 PM:
-

My topology contains stateless subtopologies as well as stateful, so it may be 
that case. 

One question: 

> The "trigger" for the cleanup is the absence of any traffic on a topic 
> partition which will allow the directory to be deleted

 Can you elaborate on this trigger? What will be cleaned in this cleanup 
process? How long does it wait by default for traffic to determine absence?


was (Author: xmar):
My topology contains stateless subtopologies, so it may be that case. 

One question: 

> The "trigger" for the cleanup is the absence of any traffic on a topic 
> partition which will allow the directory to be deleted

 Can you elaborate on this trigger? What will be cleaned in this cleanup 
process? How long does it wait by default for traffic to determine absence?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> 

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-02 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877136#comment-16877136
 ] 

Di Campo edited comment on KAFKA-5998 at 7/2/19 4:52 PM:
-

Just in case it helps. I just found it today on 2.1.1 (again, I commented here 
some months ago). 
 5 brokers cluster, 3 Kafka Streams instances (2 `num.streams.threads` each). 
AMZN Linux. Docker on ECS.

I've seen that, before the task dies, it prints the following WARNs from one 
task.

Please note that from the 64 partitions, only a few of them fail starting at 
13:17. And the same batch of the same partitions start failing again at 13:42. 
 Why are the same partitions failing? Does it match with your findings?

 

{{[2019-07-02 13:17:01,101] WARN task [2_31] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_31/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:17:01,118] WARN task [2_47] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_47/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:17:01,156] WARN task [2_27] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_27/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:12,360] WARN task [2_63] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_63/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:12,579] WARN task [2_35] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_35/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:13,001] WARN task [2_23] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_23/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:23:18,421] WARN task [2_39] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_39/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:23:18,613] WARN task [2_55] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_55/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}

{{[2019-07-02 13:42:46,366] WARN task [2_31] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_31/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:42:46,473] WARN task [2_47] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_47/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:42:46,639] WARN task [2_27] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_27/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:46:19,888] WARN task [2_63] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_63/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:46:20,042] WARN task [2_35] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_35/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:46:20,380] WARN task [2_55] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_55/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:46:20,384] WARN task [2_23] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_23/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
 {{[2019-07-02 13:48:07,011] WARN task [2_39] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_39/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}

 

Later, the application died some minutes later, at 13:59:13. In case there is a 
relation, it was killed due to OOM.

 


was (Author: xmar):
Just in case it helps. I just found it today on 2.1.1 (again, I commented here 
some months ago). 
 5 brokers cluster, 3 Kafka Streams instances (2 `num.streams.threads` each). 
AMZN Linux. Docker on ECS.

I've seen that, before the task dies, it prints the following WARNs from one 
task.

Please note that from the 64 partitions, only a few of them fail starting at 
13:17. And the same batch of the same partitions start failing again at 13:42. 
Why are the 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-02 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877136#comment-16877136
 ] 

Di Campo commented on KAFKA-5998:
-

Just in case it helps. I just found it today on 2.1.1 (again, I commented here 
some months ago). 
 5 brokers cluster, 3 Kafka Streams instances (2 `num.streams.threads` each). 
AMZN Linux. Docker on ECS.

I've seen that, before the task dies, it prints the following WARNs from one 
task.

Please note that from the 64 partitions, only a few of them fail starting at 
13:17. And the same batch of the same partitions start failing again at 13:42. 
Why are the same partitions failing? Does it match with your findings?

{{ [2019-07-02 13:17:01,101] WARN task [2_31] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_31/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{ [2019-07-02 13:17:01,118] WARN task [2_47] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_47/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{ [2019-07-02 13:17:01,156] WARN task [2_27] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_27/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:12,360] WARN task [2_63] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_63/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:12,579] WARN task [2_35] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_35/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:20:13,001] WARN task [2_23] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_23/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:23:18,421] WARN task [2_39] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_39/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:23:18,613] WARN task [2_55] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_55/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}


{{[2019-07-02 13:42:46,366] WARN task [2_31] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_31/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:42:46,473] WARN task [2_47] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_47/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:42:46,639] WARN task [2_27] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_27/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:46:19,888] WARN task [2_63] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_63/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:46:20,042] WARN task [2_35] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_35/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:46:20,380] WARN task [2_55] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_55/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:46:20,384] WARN task [2_23] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_23/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}
{{[2019-07-02 13:48:07,011] WARN task [2_39] Failed to write offset checkpoint 
file to /data/kafka-streams/stream-processor-0.0.1/2_39/.checkpoint: {} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)}}

 

Later, the application died some minutes later, at 13:59:13. In case there is a 
relation, it was killed due to OOM.

 

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have 

[jira] [Updated] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8612:

Description: 
Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
threads each. `2.1.0` 
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a 
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
sessioned, 4 key-value.
Infra running on docker on AWS ECS. 
Consuming at a rate of 300-1000 events per second. Each event generates an avg 
of ~20 extra messages.
Application has uncaughtExceptionHandler set.

Timestamps are kept for better analysis.

`stream-processor` tasks at some point fail to produce to any partition due to 
timeouts:


{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic 
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for (...)-48:120002 ms has passed since batch creation; No more 
records will be sent and no more offsets will be recorded for this task.
{noformat}


and "Offset commit failed" errors, in all partitions:

{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
 groupId=stream-processor-0.0.1] Offset commit failed on partition 
events-raw-63 at offset 4858803: The request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}

_At this point we begin seeing error messages in one of the brokers (see below, 
Broker logs section)._

More error messages are shown on the `stream-processor`: 

{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before successfully committing offsets 
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

then hundreds of messages of the following types (one per topic-partitio) 
intertwinned: 

{noformat}
[2019-06-28 10:05:23,608] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Got error produce response with correlation id 39946 on topic-partition 
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}

{noformat}
[2019-06-28 10:05:23,609] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Received invalid metadata error in produce request on partition (topic)1-59 
due to org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}


And then: 


{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread 
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
Failed to commit stream task 1_57 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
[1_57] Abort sending since an error caught with a previous record (...) to 
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and 
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}


...and eventually we get to the error messages: 

{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Uncaught error in kafka producer I/O thread: 
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}

{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store 
orderStore: 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager) 
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending 
since an error caught with a previous record (...) timestamp 1561664080389) to 
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch 
creation
{noformat}


...and eventually after seeing many messages like the above, the KafkaStreams 
is closed and the task dies, you can see when it finally dies in our piece of 

[jira] [Updated] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8612:

Description: 
Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
threads each. `2.1.0` 
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a 
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
sessioned, 4 key-value.
Infra running on docker on AWS ECS. 
Consuming at a rate of 300-1000 events per second. Each event generates an avg 
of ~20 extra messages.
Application has uncaughtExceptionHandler set.

Timestamps are kept for better analysis.

`stream-processor` tasks at some point fail to produce to any partition due to 
timeouts:


{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic 
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for (...)-48:120002 ms has passed since batch creation; No more 
records will be sent and no more offsets will be recorded for this task.
{noformat}


and "Offset commit failed" errors, in all partitions:

{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
 groupId=stream-processor-0.0.1] Offset commit failed on partition 
events-raw-63 at offset 4858803: The request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}

_At this point we begin seeing error messages in one of the brokers (see below, 
Broker logs section)._

More error messages are shown on the `stream-processor`: 

{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before successfully committing offsets 
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

then hundreds of messages of the following types (one per topic-partitio) 
intertwinned: 

{noformat}
[2019-06-28 10:05:23,608] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Got error produce response with correlation id 39946 on topic-partition 
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}

{noformat}
[2019-06-28 10:05:23,609] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Received invalid metadata error in produce request on partition (topic)1-59 
due to org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}


And then: 


{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread 
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
Failed to commit stream task 1_57 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
[1_57] Abort sending since an error caught with a previous record (...) to 
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and 
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}


...and eventually we get to the error messages: 

{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Uncaught error in kafka producer I/O thread: 
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}

{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store 
orderStore: 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager) 
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending 
since an error caught with a previous record (...) timestamp 1561664080389) to 
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch 
creation
{noformat}


...and eventually after seeing many messages like the above, the KafkaStreams 
is closed and the task dies, you can see when it finally dies in our piece of 

[jira] [Updated] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8612:

Description: 
Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
threads each. `2.1.0` 
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a 
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
sessioned, 4 key-value.
Infra running on docker on AWS ECS. 
Consuming at a rate of 300-1000 events per second. Each event generates an avg 
of ~20 extra messages.
Application has uncaughtExceptionHandler set.

Timestamps are kept for better analysis.

`stream-processor` tasks at some point fail to produce to any partition due to 
timeouts:


{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic 
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for (...)-48:120002 ms has passed since batch creation; No more 
records will be sent and no more offsets will be recorded for this task.
{noformat}


and "Offset commit failed" errors, in all partitions:

{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
 groupId=stream-processor-0.0.1] Offset commit failed on partition 
events-raw-63 at offset 4858803: The request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}

_At this point we begin seeing error messages in one of the brokers (see below, 
Broker logs section)._

More error messages are shown on the `stream-processor`: 

{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before successfully committing offsets 
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

then hundreds of messages of the following types (one per topic-partitio) 
intertwinned: 

{noformat}
[2019-06-28 10:05:23,608] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Got error produce response with correlation id 39946 on topic-partition 
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}

{noformat}
[2019-06-28 10:05:23,609] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Received invalid metadata error in produce request on partition (topic)1-59 
due to org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}


And then: 


{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread 
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
Failed to commit stream task 1_57 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
[1_57] Abort sending since an error caught with a previous record (...) to 
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and 
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}


...and eventually we get to the error messages: 

{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Uncaught error in kafka producer I/O thread: 
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}

{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store 
orderStore: 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager) 
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending 
since an error caught with a previous record (...) timestamp 1561664080389) to 
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch 
creation
{noformat}


...and eventually after seeing many messages like the above, the KafkaStreams 
is closed and the task dies, you can see when it finally dies in our piece of 

[jira] [Updated] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8612:

Description: 
Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
threads each. `2.1.0` 
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a 
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
sessioned, 4 key-value.
Infra running on docker on AWS ECS. 
Consuming at a rate of 300-1000 events per second. Each event generates an avg 
of ~20 extra messages.
Application has uncaughtExceptionHandler set.

Timestamps are kept for better analysis.

`stream-processor` tasks at some point fail to produce to any partition due to 
timeouts:


{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic 
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for (...)-48:120002 ms has passed since batch creation; No more 
records will be sent and no more offsets will be recorded for this task.
{noformat}


and "Offset commit failed" errors, in all partitions:

{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
 groupId=stream-processor-0.0.1] Offset commit failed on partition 
events-raw-63 at offset 4858803: The request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}

_At this point we begin seeing error messages in one of the brokers (see below, 
Broker logs section)._

More error messages are shown on the `stream-processor`: 

{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before successfully committing offsets 
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

then hundreds of messages of the following types (one per topic-partitio) 
intertwinned: 

{noformat}
[2019-06-28 10:05:23,608] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Got error produce response with correlation id 39946 on topic-partition 
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}

{noformat}
[2019-06-28 10:05:23,609] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Received invalid metadata error in produce request on partition (topic)1-59 
due to org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}


And then: 


{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread 
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
Failed to commit stream task 1_57 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
[1_57] Abort sending since an error caught with a previous record (...) to 
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and 
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}


...and eventually we get to the error messages: 

{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Uncaught error in kafka producer I/O thread: 
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}

{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store 
orderStore: 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager) 
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending 
since an error caught with a previous record (...) timestamp 1561664080389) to 
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch 
creation
{noformat}


...and eventually after seeing many messages like the above, the KafkaStreams 
is closed and the task dies, you can see when it finally dies in our piece of 

[jira] [Created] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

2019-06-28 Thread Di Campo (JIRA)
Di Campo created KAFKA-8612:
---

 Summary: Broker removes consumers from CG, Streams app gets stuck
 Key: KAFKA-8612
 URL: https://issues.apache.org/jira/browse/KAFKA-8612
 Project: Kafka
  Issue Type: Bug
  Components: clients, streams
Affects Versions: 2.1.1
Reporter: Di Campo
 Attachments: full-thread-dump-kafka-streams-stuck.log

Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 
threads each. `2.1.0` 
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a 
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them 
sessioned, 4 key-value.
Infra running on docker on AWS ECS. 
Consuming at a rate of 300-1000 events per second. Each event generates an avg 
of ~20 extra messages.

Timestamps are kept for better analysis.

`stream-processor` tasks at some point fail to produce to any partition due to 
timeouts:


{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic 
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for (...)-48:120002 ms has passed since batch creation; No more 
records will be sent and no more offsets will be recorded for this task.
{noformat}


and "Offset commit failed" errors, in all partitions:

{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
 groupId=stream-processor-0.0.1] Offset commit failed on partition 
events-raw-63 at offset 4858803: The request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}

_At this point we begin seeing error messages in one of the brokers (see below, 
Broker logs section)._

More error messages are shown on the `stream-processor`: 

{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before successfully committing offsets 
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

then hundreds of messages of the following types (one per topic-partitio) 
intertwinned: 

{noformat}
[2019-06-28 10:05:23,608] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Got error produce response with correlation id 39946 on topic-partition 
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}

{noformat}
[2019-06-28 10:05:23,609] WARN [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Received invalid metadata error in produce request on partition (topic)1-59 
due to org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}


And then: 


{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread 
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] 
Failed to commit stream task 1_57 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task 
[1_57] Abort sending since an error caught with a previous record (...) to 
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and 
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}


...and eventually we get to the error messages: 

{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer 
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
 Uncaught error in kafka producer I/O thread: 
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}

{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store 
orderStore: 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager) 
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending 
since an error caught with a previous record (...) timestamp 1561664080389) to 
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 
record(s) for 

[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8608:

Description: 
I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
there were 32 topics and 64 partitions on each, replication 3.

Running reassigning partitions. 

On each run, I can see the following WARN messages, but when the reassignment 
partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
partition).

But I get the following messages types, one per partition:

 
{code:java}
[2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
entry EpochEntry(epoch=24, startOffset=51540) caused truncation of conflicting 
entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). Cache now contains 
5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
-> This relates to cache, so I suppose it's pretty safe.
{code:java}
[2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
record follower 3's position 47981 since the replica is not recognized to be 
one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
records will be returned for this partition. (kafka.server.ReplicaManager){code}
-> This is scary. I'm not sure about the severity of this, but it looks like it 
may be missing records? 
{code:java}
[2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
(kafka.server.ReplicaManager){code}
-> Here, these partitions are created. 


First of all - am I supposed to be missing data here? I am assuming I don't, 
and so far I don't see traces of losing anything.

If so, I'm not sure what these messages are trying to say here. Should they 
really be at WARN level? If so - should the message clarify better the 
different risks involved? 

 

 

  was:
I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
there were 32 topics and 64 partitions on each, replication 3.

Running reassigning partitions. 

On each run, I can see the following WARN messages, but when the reassignment 
partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
partition).

But I get the following messages types, one per partition:

 
{code:java}
[2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
entry EpochEntry(epoch=24, startOffset=51540) caused truncation of conflicting 
entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). Cache now contains 
5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
-> This relates to cache, so I suppose it's pretty safe.
{code:java}
[2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
record follower 3's position 47981 since the replica is not recognized to be 
one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
records will be returned for this partition. (kafka.server.ReplicaManager){code}
-> This is scary. I'm not sure about the severity of this, but it looks like it 
may be missing records? 
{code:java}
[2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
(kafka.server.ReplicaManager){code}
-> Here, these partitions are created. 


First of all - am I supposed to be missing data here? I am assuming I don't, 
and so far I don't see traces of losing anything.

If so, I'm not sure what these messages are trying to say here. Should they 
really be at WARN level? If so - what are they warning about? 

 

 


> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. 

[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2019-06-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8608:

Summary: Broker shows WARN on reassignment partitions on new brokers: 
Replica LEO, follower position & Cache truncation  (was: Broker shows WARN on 
reassignment partitions on new brokers: Replica LEO & Cache truncation)

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - what are they warning about? 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO & Cache truncation

2019-06-27 Thread Di Campo (JIRA)
Di Campo created KAFKA-8608:
---

 Summary: Broker shows WARN on reassignment partitions on new 
brokers: Replica LEO & Cache truncation
 Key: KAFKA-8608
 URL: https://issues.apache.org/jira/browse/KAFKA-8608
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.1
 Environment: Kafka 2.1.1

Reporter: Di Campo


I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
there were 32 topics and 64 partitions on each, replication 3.

Running reassigning partitions. 

On each run, I can see the following WARN messages, but when the reassignment 
partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
partition).

But I get the following messages types, one per partition:

 
{code:java}
[2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
entry EpochEntry(epoch=24, startOffset=51540) caused truncation of conflicting 
entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). Cache now contains 
5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
-> This relates to cache, so I suppose it's pretty safe.
{code:java}
[2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
record follower 3's position 47981 since the replica is not recognized to be 
one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
records will be returned for this partition. (kafka.server.ReplicaManager){code}
-> This is scary. I'm not sure about the severity of this, but it looks like it 
may be missing records? 
{code:java}
[2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
(kafka.server.ReplicaManager){code}
-> Here, these partitions are created. 


First of all - am I supposed to be missing data here? I am assuming I don't, 
and so far I don't see traces of losing anything.

If so, I'm not sure what these messages are trying to say here. Should they 
really be at WARN level? If so - what are they warning about? 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-05-29 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850739#comment-16850739
 ] 

Di Campo commented on KAFKA-8165:
-

Some things we did to help: 
- Allocating more memory
- Reducing default memory allocations for rocksDB buffers with a 
RocksDBConfigSetter
- Use container-friendly settings on vm (still Java 8 though), setting heap to 
use 1/2 fraction and leaving rest for off-heap memory. 

So far it has reduced greatly the occurrences of these connection issues. 

> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window 
> store, 4 KV stores. 
> Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
> application instance, 2 threads per instance.
> Kafka 2.1, Kafka Streams 2.1
> Amazon Linux.
> Scala application, on Docker based on openJdk9. 
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> {code:java}
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code}
>  
>  ... one for each different KV store I have (I still have another KV that 
> does not appear, and a WindowedStore store that also does not appear). 
>  Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> {code:java}
> java.lang.OutOfMemoryError: GC overhead limit exceeded{code}
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814233#comment-16814233
 ] 

Di Campo edited comment on KAFKA-5998 at 4/10/19 9:13 AM:
--

It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{cat /proc/version}}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}


was (Author: xmar):
It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{$ cat /proc/version }}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814233#comment-16814233
 ] 

Di Campo commented on KAFKA-5998:
-

It also happens to me on kafka_2.12-2.1.0. Run in AWS in docker (based in 
wurstmeister image) over Amazon Linux.

 

{{$ cat /proc/version }}
{{Linux version 4.14.77-69.57.amzn1.x86_64 (mockbuild@gobi-build-60003) (gcc 
version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) #1 SMP Tue Nov 6 21:32:55 UTC 
2018}}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-03-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Description: 
Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
(largely stateful) application has been consuming ~160 messages per second at a 
sustained rate for several hours. 

However it started having connection issues to the brokers. 
{code:java}
Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient){code}
Also it began showing a lot of these errors: 
{code:java}
WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
 groupId=stream-processor] 1 partitions have leader brokers without a matching 
listener, including [broker-2-health-check-0] 
(org.apache.kafka.clients.NetworkClient){code}
In fact, the _health-check_ topic is in the broker but not consumed by this 
topology or used in any way by the Streams application (it is just broker 
healthcheck). It does not complain about topics that are actually consumed by 
the topology. 

Some time after these errors (that appear at a rate of 24 appearances per 
second during ~5 minutes), then the following logs appear: 
{code:java}
[2019-03-27 15:14:47,709] WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
 groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient){code}
In between 6 and then 3 lines of "Connection could not be established" error 
messages, 3 of these ones slipped in: 
{code:java}
[2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
partition 15 total records to be restored 17 
(com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code}
 
 ... one for each different KV store I have (I still have another KV that does 
not appear, and a WindowedStore store that also does not appear). 
 Then I finally see "Restoration Complete" (using a logging 
ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
seems it may be fine now to restart the processing.

Three minutes later, some events get processed, and I see an OOM error:  
{code:java}
java.lang.OutOfMemoryError: GC overhead limit exceeded{code}
 

... so given that it usually allows to process during hours under same 
circumstances, I'm wondering whether there is some memory leak in the 
connection resources or somewhere in the handling of this scenario.

Kafka and KafkaStreams 2.1

  was:
Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
(largely stateful) application has been consuming ~160 messages per second at a 
sustained rate for several hours. 

However it started having connection issues to the brokers. 


{code:java}
Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient){code}

Also it began showing a lot of these errors: 


{code:java}
WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
 groupId=stream-processor] 1 partitions have leader brokers without a matching 
listener, including [broker-2-health-check-0] 
(org.apache.kafka.clients.NetworkClient){code}
In fact, the _health-check_ topic is in the broker but not consumed by this 
topology or used in any way by the Streams application (it is just broker 
healthcheck). It does not complain about topics that are actually consumed by 
the topology. 

Some time after these errors (that appear at a rate of 24 appearances per 
second during ~5 minutes), then the following logs appear: 


{code:java}
[2019-03-27 15:14:47,709] WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
 groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient){code}
In between 6 and then 3 lines of "Connection could not be established" error 
messages, 3 of these ones slipped in: 


[2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
partition 15 total records to be restored 17 
(com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
 
... one for each different KV store I have (I still have another KV that does 
not appear, and a WindowedStore store that also does not appear). 
Then I finally see "Restoration Complete" (using a logging 
ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
seems it may be fine now to restart the processing.

Three minutes later, some events get processed, and I see an OOM error:  


java.lang.OutOfMemoryError: GC overhead limit exceeded
 

... so given that it usually allows to process during hours under 

[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-03-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Environment: 
3 nodes, 22 topics, 16 partitions per topic, 1 window store, 4 KV stores. 
Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
application instance, 2 threads per instance.
Kafka 2.1, Kafka Streams 2.1
Amazon Linux.
Scala application, on Docker based on openJdk9. 

  was:
Kafka 2.1, Kafka Streams 2.1
Amazon Linux, on Docker based on wurstmeister/kafka image


> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window 
> store, 4 KV stores. 
> Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
> application instance, 2 threads per instance.
> Kafka 2.1, Kafka Streams 2.1
> Amazon Linux.
> Scala application, on Docker based on openJdk9. 
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
>  
> ... one for each different KV store I have (I still have another KV that does 
> not appear, and a WindowedStore store that also does not appear). 
> Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection and store restoration

2019-03-27 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Environment: 
Kafka 2.1, Kafka Streams 2.1
Amazon Linux, on Docker based on wurstmeister/kafka image

  was:Amazon Linux container, on Docker based on wurstmeister/kafka image.


> Streams task causes Out Of Memory after connection and store restoration
> 
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: Kafka 2.1, Kafka Streams 2.1
> Amazon Linux, on Docker based on wurstmeister/kafka image
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
>  
> ... one for each different KV store I have (I still have another KV that does 
> not appear, and a WindowedStore store that also does not appear). 
> Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-03-27 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Summary: Streams task causes Out Of Memory after connection issues and 
store restoration  (was: Streams task causes Out Of Memory after connection and 
store restoration)

> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: Kafka 2.1, Kafka Streams 2.1
> Amazon Linux, on Docker based on wurstmeister/kafka image
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
>  
> ... one for each different KV store I have (I still have another KV that does 
> not appear, and a WindowedStore store that also does not appear). 
> Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8165) Streams task causes Out Of Memory after connection and store restoration

2019-03-27 Thread Di Campo (JIRA)
Di Campo created KAFKA-8165:
---

 Summary: Streams task causes Out Of Memory after connection and 
store restoration
 Key: KAFKA-8165
 URL: https://issues.apache.org/jira/browse/KAFKA-8165
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
 Environment: Amazon Linux container, on Docker based on 
wurstmeister/kafka image.
Reporter: Di Campo


Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
(largely stateful) application has been consuming ~160 messages per second at a 
sustained rate for several hours. 

However it started having connection issues to the brokers. 


{code:java}
Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient){code}

Also it began showing a lot of these errors: 


{code:java}
WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
 groupId=stream-processor] 1 partitions have leader brokers without a matching 
listener, including [broker-2-health-check-0] 
(org.apache.kafka.clients.NetworkClient){code}
In fact, the _health-check_ topic is in the broker but not consumed by this 
topology or used in any way by the Streams application (it is just broker 
healthcheck). It does not complain about topics that are actually consumed by 
the topology. 

Some time after these errors (that appear at a rate of 24 appearances per 
second during ~5 minutes), then the following logs appear: 


{code:java}
[2019-03-27 15:14:47,709] WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
 groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient){code}
In between 6 and then 3 lines of "Connection could not be established" error 
messages, 3 of these ones slipped in: 


[2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
partition 15 total records to be restored 17 
(com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
 
... one for each different KV store I have (I still have another KV that does 
not appear, and a WindowedStore store that also does not appear). 
Then I finally see "Restoration Complete" (using a logging 
ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
seems it may be fine now to restart the processing.

Three minutes later, some events get processed, and I see an OOM error:  


java.lang.OutOfMemoryError: GC overhead limit exceeded
 

... so given that it usually allows to process during hours under same 
circumstances, I'm wondering whether there is some memory leak in the 
connection resources or somewhere in the handling of this scenario.

Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-7622:

Description: 
When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{SessionStore}}, even if the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
  

  was:
When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{WindowStore}}, even of the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
 


> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Di Campo (JIRA)
Di Campo created KAFKA-7622:
---

 Summary: Add findSessions functionality to ReadOnlySessionStore
 Key: KAFKA-7622
 URL: https://issues.apache.org/jira/browse/KAFKA-7622
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Di Campo


When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{WindowStore}}, even of the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-06-02 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494982#comment-16494982
 ] 

Di Campo edited comment on KAFKA-6188 at 6/2/18 6:18 PM:
-

Hi ,

I faced same issue as [~chubao] when deleting a KafkaStreams produced topic (In 
my case, the failing topic is a State Store managed through Processor API, but 
it is curious we both got the error in an internally-created StateStore-backing 
changelog topic). 
 I am on Amazon EFS too. Kafka 1.1.0. Java 9. 3-broker cluster.

Deletion of topic of a state store failed. It failed with around 100M over 40M 
<1k events. 
{quote}{{[2018-05-30 08:56:36,193] INFO [ReplicaFetcher replicaId=2, 
leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1854198522, 
epoch=2329887) to node 1: java.io.IOException: Connection to 1 was disconnected 
before the response was read. (org.apache.kafka.clients.FetchSessionHandler)}}
 {{[2018-05-30 08:56:36,195] INFO Deleted offset index 
/kafka/kafka-logs-2/stream-processor-lastSessionByChannelStoreName-changelog-13.b9486f127f05418787972d5823506db4-delete/02914722.index.
 (kafka.log.LogSegment)}}
 {{[2018-05-30 08:56:36,203] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1854198522, 
epoch=2329887)) (kafka.server.ReplicaFetcherThread)}}
 {{java.io.IOException: Connection to 1 was disconnected before the response 
was read}}
 \{{ at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)}}
{quote}
First time I found this there were about 100M events. Then I tried with 10M 
events, and it was deleted OK, no error. Then I tried again with 40M, and it 
failed.

We have read that there 
[are|https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices] 
[issues|https://github.com/strimzi/strimzi/issues/441] on [memory 
mapped|https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026] 
files, which Kafka uses for indexes, and that there are issues with those in 
Networked File Systems (such as EFS). May this be related? 

 


was (Author: xmar):
Hi ,

I faced same issue as [~chubao] when deleting a KafkaStreams produced topic (In 
my case, the failing topic is a State Store managed through Processor API, but 
it is curious we both got the error in an internally-created StateStore-backing 
changelog topic). 
I am on Amazon EFS too. Kafka 1.1.0. Java 9. 3-broker cluster.

Deletion of topic of a state store failed. It failed with arpund 100M over 40M 
<1k events. 
{quote}{{[2018-05-30 08:56:36,193] INFO [ReplicaFetcher replicaId=2, 
leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1854198522, 
epoch=2329887) to node 1: java.io.IOException: Connection to 1 was disconnected 
before the response was read. (org.apache.kafka.clients.FetchSessionHandler)}}
{{[2018-05-30 08:56:36,195] INFO Deleted offset index 
/kafka/kafka-logs-2/stream-processor-lastSessionByChannelStoreName-changelog-13.b9486f127f05418787972d5823506db4-delete/02914722.index.
 (kafka.log.LogSegment)}}
{{[2018-05-30 08:56:36,203] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1854198522, 
epoch=2329887)) (kafka.server.ReplicaFetcherThread)}}
{{java.io.IOException: Connection to 1 was disconnected before the response was 
read}}
{{ at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)}}{quote}

First time I found this there were about 100M events. Then I tried with 10M 
events, and it was deleted OK, no error. Then I tried again with 40M, and it 
failed.

We have read that there 
[are|https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices] 
[issues|https://github.com/strimzi/strimzi/issues/441] on [memory 
mapped|https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026] 
files, which Kafka uses for indexes, and that there are issues with those in 
Networked File Systems (such as EFS). May this be related? 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> 

[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-30 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494982#comment-16494982
 ] 

Di Campo commented on KAFKA-6188:
-

Hi ,

I faced same issue as [~chubao] when deleting a KafkaStreams produced topic (In 
my case, the failing topic is a State Store managed through Processor API, but 
it is curious we both got the error in an internally-created StateStore-backing 
changelog topic). 
I am on Amazon EFS too. Kafka 1.1.0. Java 9. 3-broker cluster.

Deletion of topic of a state store failed. It failed with arpund 100M over 40M 
<1k events. 
{quote}{{[2018-05-30 08:56:36,193] INFO [ReplicaFetcher replicaId=2, 
leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1854198522, 
epoch=2329887) to node 1: java.io.IOException: Connection to 1 was disconnected 
before the response was read. (org.apache.kafka.clients.FetchSessionHandler)}}
{{[2018-05-30 08:56:36,195] INFO Deleted offset index 
/kafka/kafka-logs-2/stream-processor-lastSessionByChannelStoreName-changelog-13.b9486f127f05418787972d5823506db4-delete/02914722.index.
 (kafka.log.LogSegment)}}
{{[2018-05-30 08:56:36,203] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1854198522, 
epoch=2329887)) (kafka.server.ReplicaFetcherThread)}}
{{java.io.IOException: Connection to 1 was disconnected before the response was 
read}}
{{ at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)}}{quote}

First time I found this there were about 100M events. Then I tried with 10M 
events, and it was deleted OK, no error. Then I tried again with 40M, and it 
failed.

We have read that there 
[are|https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices] 
[issues|https://github.com/strimzi/strimzi/issues/441] on [memory 
mapped|https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026] 
files, which Kafka uses for indexes, and that there are issues with those in 
Networked File Systems (such as EFS). May this be related? 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
>