Roger Hoover created KAFKA-3752: ----------------------------------- Summary: Provide a way for KStreams to recover from unclean shutdown Key: KAFKA-3752 URL: https://issues.apache.org/jira/browse/KAFKA-3752 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.0 Reporter: Roger Hoover Assignee: Guozhang Wang
If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM Killer), it may leave behind lock files and fail to recover. It would be useful to have an options (say --force) to tell KStreams to proceed even if it finds old LOCK files. {noformat} [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread:583) org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) Caused by: java.io.IOException: Failed to lock the state directory: /data/test/2/kafka-streams/test-2/0_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95) at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) ... 32 more {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)