[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-16 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011966#comment-16011966
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/16/17 8:37 AM:
--

[~mjsax] 

I really can't give you any ultimate answer for question one as I don't have 
logs from other nodes and therefore I can't check this particular scenario, but 
the nature of our stream is that it indeed might happen that we don't receive a 
single message for particular key or partition for some time. I think the log 
which I shared with you is actually from test system where we might not be 
getting any messages for several hours.

I already gave you the answer for your second question - we are running 8 
parallel nodes of this app all the time, and we run single kafka streams 
instance on each node.

We don't use  pattern subscription.

Hope it helps!






was (Author: lukas gemela):
[~mjsax] 

I really can't give you any ultimate answer for question one as I don't have 
logs from other nodes and therefore I can't check this particular scenario, but 
the nature of our stream is that it indeed might happen that we don't receive a 
single message for particular key or partition for some time. I think the log 
which I shared with you is actually from test system where we might not be 
getting any messages for several hours.

I already gave you the answer for your second question - we are running 8 
parallel nodes of this app all the time, and we run single kafka streams 
instance on each node.

We don't use  pattern subscription.

Hope it helps!





> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Assignee: Matthias J. Sax
> Attachments: clio_afa596e9b809.gz, clio_reduced.gz, clio.txt.gz
>
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-16 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011982#comment-16011982
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/16/17 8:33 AM:
--

[~guozhang] full logs attached. (clio_afa596e9b809.gz)

I'll apply the patch and come back to you if the issue occur again.

Thanks!

L.


was (Author: lukas gemela):
[~guozhang] full logs attached. 

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Assignee: Matthias J. Sax
> Attachments: clio_afa596e9b809.gz, clio_reduced.gz, clio.txt.gz
>
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T18:05:10,877 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-05-01T00:01:55,707 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-05-01T00:01:59,027 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-05-01T00:01:59,031 ERROR StreamThread-1 
> org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - 
> stream-thread [StreamThread-1] Streams application error during processing:
>  java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
>  ~[kafka-streams-0.10.2.0.jar!/:?]
>   at 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011331#comment-16011331
 ] 

Matthias J. Sax edited comment on KAFKA-5154 at 5/15/17 9:10 PM:
-

Thanks for sharing the logs. We cycle back if we need more input. We see 
"Ignoring fetched records" before the error. Seems to be related but we don't 
know yet.
{noformat}
[m2017-05-08T22:45:40,224 DEBUG StreamThread-1 
org.apache.kafka.clients.consumer.internals.Fetcher.drainRecords() @526 - 
Ignoring fetched records for poseidonIncidentFeed-38 at offset 21353 since the 
current position is 21354
2017-05-08T22:45:40,224 DEBUG StreamThread-1 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches() @180 - 
Sending fetch for partitions [poseidonIncidentFeed-38] to broker 
10.210.200.144:9092 (id: 3 rack: null)
2017-05-08T22:45:40,227ƒ√ ERROR StreamThread-1 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop() @620 - 
Unexpected error: fetched partition poseidonIncidentFeed-38 does not belong to 
the active task partitions.
 tasksByPartition: {}
 assignedPartitions: [poseidonIncidentFeed-21, poseidonIncidentFeed-6, 
poseidonIncidentFeed-38, poseidonIncidentFeed-12]
{noformat}

To reason about the logs better, one more question: can it be, that partition 
38 from topic {{poseidonIncidentFeed}} does not get any data to process for 
some time? It seems, that there is not data, when new data is written to the 
partition the error hits, and after Streams somehow "progresses" over the burst 
of data, the error disappears again (as not data is fetched anymore). Could 
this be the case? Or do you constantly write new data to partition 38 and thus 
Stream constantly processes data but suddenly fails?

Another follow up question: in KAFKA-5242 you mention that you run with a 
single thread. Does this imply that your whole Streams application is single 
threaded (ie, you use only one JVM), or do you start up multiple JVMs and scale 
your app like this?

Last question: do you use pattern subscription by any change?


was (Author: mjsax):
Thanks for sharing the logs. We cycle back if we need more input. We see 
"Ignoring fetched records" before the error. Seems to be related but we don't 
know yet.
{noformat}
[m2017-05-08T22:45:40,224 DEBUG StreamThread-1 
org.apache.kafka.clients.consumer.internals.Fetcher.drainRecords() @526 - 
Ignoring fetched records for poseidonIncidentFeed-38 at offset 21353 since the 
current position is 21354
2017-05-08T22:45:40,224 DEBUG StreamThread-1 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches() @180 - 
Sending fetch for partitions [poseidonIncidentFeed-38] to broker 
10.210.200.144:9092 (id: 3 rack: null)
2017-05-08T22:45:40,227ƒ√ ERROR StreamThread-1 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop() @620 - 
Unexpected error: fetched partition poseidonIncidentFeed-38 does not belong to 
the active task partitions.
 tasksByPartition: {}
 assignedPartitions: [poseidonIncidentFeed-21, poseidonIncidentFeed-6, 
poseidonIncidentFeed-38, poseidonIncidentFeed-12]
{noformat}

To reason about the logs better, one more question: can it be, that partition 
38 from topic {{poseidonIncidentFeed}} does not get any data to process for 
some time? It seems, that there is not data, when new data is written to the 
partition the error hits, and after Streams somehow "progresses" over the burst 
of data, the error disappears again (as not data is fetched anymore). Could 
this be the case? Or do you constantly write new data to partition 38 and thus 
Stream constantly processes data but suddenly fails?

Another follow up question: in KAFKA-5242 you mention that you run with a 
single thread. Does this imply that your whole Streams application is single 
threaded (ie, you use only one JVM), or do you start up multiple JVMs and scale 
your app like this?

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Assignee: Matthias J. Sax
> Attachments: clio_reduced.gz, clio.txt.gz
>
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009460#comment-16009460
 ] 

Matthias J. Sax edited comment on KAFKA-5154 at 5/13/17 6:42 PM:
-

[~Lukas Gemela] Thanks. This helps already. Before the error occurs. we do log 
the assignment:
{noformat}
log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, 
activeTasks.keySet());

log.info("{} at state {}: new partitions {} assigned at the end of consumer 
rebalance.", logPrefix, state, assignment);
...
log.info("{} Creating active task {} with assigned partitions {}", logPrefix, 
id, partitions);
...
log.info("{} Creating new standby task {} with assigned partitions {}", 
logPrefix, id, partitions);
{noformat}

and similar. Can you share those logs, too? Maybe you can just dump the whole 
log and attach the file to this Jira?


was (Author: mjsax):
[~Lukas Gemela] Thanks. This helps already. Before the error occurs. we do log 
the assignment:
{noquote}
log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, 
activeTasks.keySet());

log.info("{} at state {}: new partitions {} assigned at the end of consumer 
rebalance.", logPrefix, state, assignment);
...
log.info("{} Creating active task {} with assigned partitions {}", logPrefix, 
id, partitions);
...
log.info("{} Creating new standby task {} with assigned partitions {}", 
logPrefix, id, partitions);
{noquote}

and similar. Can you share those logs, too? Maybe you can just dump the whole 
log and attach the file to this Jira?

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
> Attachments: clio.txt.gz
>
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-04 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996995#comment-15996995
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/4/17 4:26 PM:
-

I wasnt able to reproduce the issue yet, but I just found in the source code 
the exception which we were getting on the other nodes when that NPE occured:

throw new IllegalStateException(String.format("%s Log end 
offset of %s should not change while restoring: old end offset %d, current 
offset %d",
logPrefix, storePartition, endOffset, 
restoreConsumer.position(storePartition)));


from ProcessorStateManager.

Hope it helps!

Lukas



was (Author: lukas gemela):
I wasnt able to reproduce the issue yet, but I just found in the source code 
the exception which we were getting on the other nodes:

throw new IllegalStateException(String.format("%s Log end 
offset of %s should not change while restoring: old end offset %d, current 
offset %d",
logPrefix, storePartition, endOffset, 
restoreConsumer.position(storePartition)));


from ProcessorStateManager.

Hope it helps!

Lukas


> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T18:05:10,877 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-05-01T00:01:55,707 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 11:10 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
messages to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app all the time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
messages to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app all the time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:56 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
messages to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app all the time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app all the time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:35 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app all the time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:34 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:33 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.


___
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.

Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:33 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.

Please also note that in our exception handler we try to stop kafka streams app 
with our custom timeout and schedule a spring task to start it again. As far I 
can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.

Hope it helps!

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered 

[jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance

2017-05-02 Thread Lukas Gemela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993902#comment-15993902
 ] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:27 PM:
--

Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.

Hope it helps!


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on 
another internal topic (by using in-memory logged store) and then outputting 
them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 
partitions as well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
Non of our calls is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are 
lost now, I've just seen couple of illegalStateExceptions on another nodes, 
coming from kafka streams stack, but I'm not even sure if they were thrown 
around the same time as NPE happened on this node.

Hope it helps!

> Kafka Streams throws NPE during rebalance
> -
>
> Key: KAFKA-5154
> URL: https://issues.apache.org/jira/browse/KAFKA-5154
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during 
> rebalance, which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, 
> poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] 
> for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
>  @407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
>  @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, 
> poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, 
> poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
>  @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: 
> null) dead for group hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() 
> @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) 
> for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
>  @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, 
>