[ https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737510#comment-16737510 ]
Patrik Kleindl commented on KAFKA-6767: --------------------------------------- [~guozhang] Not sure if we are hitting the same issue here in 2.0.0-cp1 but I got here because of the log message during investigation of a possible data loss similar to the one described in https://issues.apache.org/jira/browse/KAFKA-7672 I tried to isolate the logs for one stream thread: {code:java} 2019-01-03 09:53:38,068 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - stream-thread [service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11] partition assignment took 150 ms. current active tasks: [9_7, 10_6, 8_9, 7_11, 4_2, 6_3, 4_6, 9_2, 10_1, 8_3, 6_7, 4_11] current standby tasks: [] previous active tasks: [6_0, 4_2, 8_1, 6_4, 4_6, 9_2, 10_2, 8_5, 6_8, 9_7] 2019-01-03 09:53:38,601 INFO [org.apache.kafka.clients.FetchSessionHandler] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer, groupId=] Error sending fetch request (sessionId=1513832101, epoch=22) to node 2: org.apache.kafka.common.errors.DisconnectException. 2019-01-03 09:53:38,602 INFO [org.apache.kafka.clients.FetchSessionHandler] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer, groupId=] Error sending fetch request (sessionId=1465062084, epoch=24) to node 3: org.apache.kafka.common.errors.DisconnectException. 2019-01-03 09:53:38,602 INFO [org.apache.kafka.clients.FetchSessionHandler] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer, groupId=] Error sending fetch request (sessionId=1411569981, epoch=28) to node 1: org.apache.kafka.common.errors.DisconnectException. 2019-01-03 09:53:38,765 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - stream-thread [service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11] State transition from PARTITIONS_ASSIGNED to RUNNING 2019-01-03 09:53:38,788 WARN [org.apache.kafka.streams.processor.internals.ProcessorStateManager] (service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - task [9_7] Failed to write offset checkpoint file to path/9_7/.checkpoint: {}: java.io.FileNotFoundException: path/9_7/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at java.io.FileOutputStream.<init>(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:383) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368) at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) {code} The last warning has been repeated every 30 seconds ever since. > OffsetCheckpoint write assumes parent directory exists > ------------------------------------------------------ > > Key: KAFKA-6767 > URL: https://issues.apache.org/jira/browse/KAFKA-6767 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Steven Schlansker > Priority: Minor > > We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an > instance dies it is created from scratch, rather than reusing the existing > RocksDB.) > We routinely see: > {code:java} > 2018-04-09T19:14:35.004Z WARN <> > [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset > checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} > java.io.FileNotFoundException: > /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > Inspecting the state store directory, I can indeed see that {{chat/0_11}} > does not exist (although many other partitions do). > > Looking at the OffsetCheckpoint write method, it seems to try to open a new > checkpoint file without first ensuring that the parent directory exists. > > {code:java} > public void write(final Map<TopicPartition, Long> offsets) throws > IOException { > // if there is no offsets, skip writing the file to save disk IOs > if (offsets.isEmpty()) { > return; > } > synchronized (lock) { > // write to temp file and then swap with the existing file > final File temp = new File(file.getAbsolutePath() + ".tmp");{code} > > Either the OffsetCheckpoint class should initialize the directories if > needed, or some precondition of it being called should ensure that is the > case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)