Roger Hoover created KAFKA-3752:
-----------------------------------

             Summary: Provide a way for KStreams to recover from unclean 
shutdown
                 Key: KAFKA-3752
                 URL: https://issues.apache.org/jira/browse/KAFKA-3752
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Roger Hoover
            Assignee: Guozhang Wang


If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
Killer), it may leave behind lock files and fail to recover.

It would be useful to have an options (say --force) to tell KStreams to proceed 
even if it finds old LOCK files.

{noformat}
[2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread 
[StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread:583)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
the state manager
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory: 
/data/test/2/kafka-streams/test-2/0_0
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
        ... 32 more
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to