[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-28 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091115#comment-16091115
 ] 

Yogesh BG edited comment on KAFKA-5545 at 8/29/17 5:00 AM:
---

ok. Thanks for looking into the issue will wait till 0.10.2.2 release.

On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA) 




was (Author: yogeshbelur):
ok. Thanks for looking into the issue will wait till 0.10.2.2 release.

On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA) 




-- 
Yogesh..BG
A10 Networks
Enzymes JNC Business Center
Mpk Mansion, 6th floor, North Wing
No 18, Gauve Garden, 5th block
Koramangala.
Banglore - 560 095
Contact no: 7760922118


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data. 



was (Author: yogeshbelur):
Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075541#comment-16075541
 ] 

Guozhang Wang edited comment on KAFKA-5545 at 7/5/17 10:34 PM:
---

[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore. But from your attached logs it does 
seems the thread was notified to shutdown but never existed the main loop:

{code}
10:02:33.981 [pool-1-thread-1] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] State transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed 
thread to shut down
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] Informed 
thread to shut down
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] Informed 
thread to shut down
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/3/17 10:27 AM:
---

Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
logger.info("New bootstrap servers 
obtained from registry server are " + bootstrapServers
+ ", old bootstrap 
server are " + oldBootstrapServerString);
boolean isChanged = 
checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
if (isChanged) {
String bootstrapServerString = 
bootstrapServersStr(bootstrapServers);

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
logger.info(
"Closing 
connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
close();
streams = new 
KafkaStreams(buildTopology(config), config);
logger.info("cleaning up 
oldBootstrapServerString [" + oldBootstrapServerString + "].");
streams.cleanUp();
start();
logger.info("Completed restart 
of kafka streams connection to new broker with configuration "
+ config);
}
} catch (Throwable ex) {
logger.error("discovery of kafka broker 
instances failed with reason : " + ex.getMessage()
+ ", will retry again", 
ex);
}
}

}, 0, refreshInterval, TimeUnit.MINUTES);
}

{code}



was (Author: yogeshbelur):
Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-06-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070474#comment-16070474
 ] 

Matthias J. Sax edited comment on KAFKA-5545 at 6/30/17 5:54 PM:
-

Hi. The messages with the stack trace

{noformat}
11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could 
not create task 0_5. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
state directory for task 0_5
{noformat}

are WARN level messages. Those are expected during rebalance and they should 
resolve automatically. And it seems, that it did resolve automatically, as you 
get 

{noformat}
11:04:13.642 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-44] Committing all tasks because the commit 
interval 1ms has elapsed
{noformat}

later on. The logs don't show any ERROR logs so I am not sure what the actual 
issues if you observe. Does you app not work properly after the WARN logs about 
the lock go away?

One more question:

{quote}
we cleanup the stream, rebuild topology(tried with reusing topology) and start 
the stream again
{quote}

What exact steps are you doing here? Send a `kill` signal? Call 
`KafkaStreams#close()`? Btw: you should be able to reuse the topology -- you 
just need to create a new KafkaStreams instance with new config. If this does 
not work for you, we should have look into this too.


was (Author: mjsax):
Hi. The messages with the stack trace

{noformat}
11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could 
not create task 0_5. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
state directory for task 0_5
{noformat}

are WARN level messages. Those are expected during rebalance and they should 
resolve automatically. And it seems, that it did resolve automatically, as you 
get 

{noformat}
11:04:13.642 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-44] Committing all tasks because the commit 
interval 1ms has elapsed
{noformat}

later on. The logs don't show any ERROR logs so I am not sure what the actual 
issues if you observe. Does you app not work properly after the WARN logs about 
the lock go away?

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
>