[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511726#comment-16511726
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---------------------------------------

mjsax closed pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams with EOS 
enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index b270e03f2e0..66ddec950c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -36,17 +36,18 @@
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     final File baseDir;
-    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
-
+    private final boolean eosEnabled;
     OffsetCheckpoint checkpoint;
 
+    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
     final Map<String, StateStore> stores = new LinkedHashMap<>();
     final Map<String, StateStore> globalStores = new LinkedHashMap<>();
 
-    AbstractStateManager(final File baseDir) {
+    AbstractStateManager(final File baseDir,
+                         final boolean eosEnabled) {
         this.baseDir = baseDir;
+        this.eosEnabled = eosEnabled;
         this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-
     }
 
     public void reinitializeStateStoresForPartitions(final Logger log,
@@ -62,11 +63,14 @@ public void reinitializeStateStoresForPartitions(final 
Logger log,
             checkpointableOffsets.remove(topicPartition);
             
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
         }
-        try {
-            checkpoint.write(checkpointableOffsets);
-        } catch (final IOException fatalException) {
-            log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
-            throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
+
+        if (!eosEnabled) {
+            try {
+                checkpoint.write(checkpointableOffsets);
+            } catch (final IOException fatalException) {
+                log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
+                throw new StreamsException("Failed to reinitialize global 
store.", fatalException);
+            }
         }
 
         for (final Map.Entry<String, StateStore> entry : 
storesCopy.entrySet()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 79088d98806..78c4a363f29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -69,7 +69,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener 
stateRestoreListener,
                                   final StreamsConfig config) {
-        super(stateDirectory.globalStateDir());
+        super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
@@ -92,16 +92,16 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext processorCo
             if (!stateDirectory.lockGlobalState()) {
                 throw new LockException(String.format("Failed to lock the 
global state directory: %s", baseDir));
             }
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new LockException(String.format("Failed to lock the global 
state directory: %s", baseDir));
         }
 
         try {
             this.checkpointableOffsets.putAll(checkpoint.read());
-        } catch (IOException e) {
+        } catch (final IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
-            } catch (IOException e1) {
+            } catch (final IOException e1) {
                 log.error("Failed to unlock the global state directory", e);
             }
             throw new StreamsException("Failed to read checkpoints for global 
state globalStores", e);
@@ -232,7 +232,7 @@ public void register(final StateStore store,
         }
 
         final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (PartitionInfo partition : partitionInfos) {
+        for (final PartitionInfo partition : partitionInfos) {
             topicPartitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
         }
         return topicPartitions;
@@ -253,8 +253,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            BatchingStateRestoreCallback
-                stateRestoreAdapter =
+            final BatchingStateRestoreCallback stateRestoreAdapter =
                 (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof
                                                      
BatchingStateRestoreCallback)
                                                 ? stateRestoreCallback
@@ -267,7 +266,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
-                    for (ConsumerRecord<byte[], byte[]> record : records) {
+                    for (final ConsumerRecord<byte[], byte[]> record : 
records) {
                         if (record.key() != null) {
                             restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
                         }
@@ -294,11 +293,11 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
     @Override
     public void flush() {
         log.debug("Flushing all global globalStores registered in the state 
manager");
-        for (StateStore store : this.globalStores.values()) {
+        for (final StateStore store : this.globalStores.values()) {
             try {
                 log.trace("Flushing global store={}", store.name());
                 store.flush();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 throw new ProcessorStateException(String.format("Failed to 
flush global state store %s", store.name()), e);
             }
         }
@@ -316,7 +315,7 @@ public void close(final Map<TopicPartition, Long> offsets) 
throws IOException {
                 log.debug("Closing global storage engine {}", entry.getKey());
                 try {
                     entry.getValue().close();
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     log.error("Failed to close global state store {}", 
entry.getKey(), e);
                     closeFailed.append("Failed to close global state store:")
                             .append(entry.getKey())
@@ -341,7 +340,7 @@ public void checkpoint(final Map<TopicPartition, Long> 
offsets) {
         if (!checkpointableOffsets.isEmpty()) {
             try {
                 checkpoint.write(checkpointableOffsets);
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 054333b7a8f..afb56c1ac1b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -67,7 +67,7 @@ public ProcessorStateManager(final TaskId taskId,
                                  final ChangelogReader changelogReader,
                                  final boolean eosEnabled,
                                  final LogContext logContext) throws 
IOException {
-        super(stateDirectory.directoryForTask(taskId));
+        super(stateDirectory.directoryForTask(taskId), eosEnabled);
 
         this.log = logContext.logger(ProcessorStateManager.class);
         this.taskId = taskId;
@@ -81,12 +81,11 @@ public ProcessorStateManager(final TaskId taskId,
         offsetLimits = new HashMap<>();
         standbyRestoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
-        restoreCallbacks = isStandby ? new HashMap<String, 
StateRestoreCallback>() : null;
+        restoreCallbacks = isStandby ? new HashMap<>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
         // load the checkpoint information
         checkpointableOffsets.putAll(checkpoint.read());
-
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored 
offsets
             checkpoint.delete();
@@ -169,11 +168,7 @@ public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition
             final int partition = getPartition(topicName);
             final TopicPartition storePartition = new 
TopicPartition(topicName, partition);
 
-            if (checkpointableOffsets.containsKey(storePartition)) {
-                partitionsAndOffsets.put(storePartition, 
checkpointableOffsets.get(storePartition));
-            } else {
-                partitionsAndOffsets.put(storePartition, -1L);
-            }
+            partitionsAndOffsets.put(storePartition, 
checkpointableOffsets.getOrDefault(storePartition, -1L));
         }
         return partitionsAndOffsets;
     }
@@ -340,7 +335,7 @@ public StateStore getGlobalStore(final String name) {
         return globalStores.get(name);
     }
 
-    private BatchingStateRestoreCallback 
getBatchingRestoreCallback(StateRestoreCallback callback) {
+    private BatchingStateRestoreCallback getBatchingRestoreCallback(final 
StateRestoreCallback callback) {
         if (callback instanceof BatchingStateRestoreCallback) {
             return (BatchingStateRestoreCallback) callback;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 6a20cd92b6b..1b03cd4f294 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -123,7 +123,7 @@ public void 
shouldRestoreStoreWithBatchingRestoreSpecification() throws Exceptio
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), 
is(1));
             
assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -141,7 +141,7 @@ public void 
shouldRestoreStoreWithSinglePutRestoreSpecification() throws Excepti
             assertThat(persistentStore.keys.size(), is(1));
             assertTrue(persistentStore.keys.contains(intKey));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -169,7 +169,7 @@ public void testRegisterPersistentStore() throws 
IOException {
             stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new 
TopicPartition(persistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -196,7 +196,7 @@ public void testRegisterNonPersistentStore() throws 
IOException {
             stateMgr.register(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new 
TopicPartition(nonPersistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -257,7 +257,7 @@ public void testChangeLogOffsets() throws IOException {
             assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -269,7 +269,7 @@ public void testGetStore() throws IOException {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -280,13 +280,13 @@ public void testGetStore() throws IOException {
             assertEquals(mockStateStore, 
stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
     @Test
     public void testFlushAndClose() throws IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.emptyMap());
 
         // set up ack'ed offsets
         final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
@@ -339,7 +339,7 @@ public void 
shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throw
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -358,7 +358,7 @@ public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() 
throws IOException {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -408,7 +408,7 @@ public void shouldWriteCheckpointForStandbyReplica() throws 
IOException {
                                                                   bytes,
                                                                   bytes)));
 
-        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+        stateMgr.checkpoint(Collections.emptyMap());
 
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, 
equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
@@ -433,7 +433,7 @@ public void shouldNotWriteCheckpointForNonPersistent() 
throws IOException {
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
-        assertThat(read, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
+        assertThat(read, equalTo(Collections.emptyMap()));
     }
 
     @Test
@@ -443,7 +443,7 @@ public void 
shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOEx
             noPartitions,
             true, // standby
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -453,10 +453,9 @@ public void 
shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOEx
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 
987L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
-        assertThat(read, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
+        assertThat(read, equalTo(Collections.emptyMap()));
     }
 
-
     @Test
     public void 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() 
throws IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
@@ -464,7 +463,7 @@ public void 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFile
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -484,7 +483,7 @@ public void 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeen
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -551,7 +550,7 @@ public void close() {
         stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            stateManager.close(Collections.emptyMap());
             fail("Should throw ProcessorStateException if store close throws 
exception");
         } catch (final ProcessorStateException e) {
             // pass
@@ -623,7 +622,7 @@ public void close() {
         stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            stateManager.close(Collections.emptyMap());
         } catch (final ProcessorStateException expected) { /* ignode */ }
         Assert.assertTrue(closedStore.get());
     }
@@ -640,7 +639,7 @@ public void 
shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOExceptio
                 noPartitions,
                 false,
                 stateDirectory,
-                Collections.<String, String>emptyMap(),
+                Collections.emptyMap(),
                 changelogReader,
                 true,
                 logContext);
@@ -653,28 +652,36 @@ public void 
shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOExceptio
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldSuccessfullyReInitializeStateStores() throws IOException 
{
+    public void shouldSuccessfullyReInitializeStateStoresWithEosDisable() 
throws Exception {
+        shouldSuccessfullyReInitializeStateStores(false);
+    }
+
+    @Test
+    public void shouldSuccessfullyReInitializeStateStoresWithEosEnable() 
throws Exception {
+        shouldSuccessfullyReInitializeStateStores(true);
+    }
+
+    private void shouldSuccessfullyReInitializeStateStores(final boolean 
eosEnabled) throws Exception {
         final String store2Name = "store2";
         final String store2Changelog = "store2-changelog";
         final TopicPartition store2Partition = new 
TopicPartition(store2Changelog, 0);
         final List<TopicPartition> changelogPartitions = 
Arrays.asList(changelogTopicPartition, store2Partition);
-        Map<String, String> storeToChangelog = new HashMap() {
+        final Map<String, String> storeToChangelog = new HashMap<String, 
String>() {
             {
                 put(storeName, changelogTopic);
                 put(store2Name, store2Changelog);
             }
         };
         final ProcessorStateManager stateManager = new ProcessorStateManager(
-                taskId,
-                changelogPartitions,
-                false,
-                stateDirectory,
-                storeToChangelog,
-                changelogReader,
-                false,
-                logContext);
+            taskId,
+            changelogPartitions,
+            false,
+            stateDirectory,
+            storeToChangelog,
+            changelogReader,
+            eosEnabled,
+            logContext);
 
         final MockStateStore stateStore = new MockStateStore(storeName, true);
         final MockStateStore stateStore2 = new MockStateStore(store2Name, 
true);
@@ -696,7 +703,7 @@ public void register(final StateStore store, final 
StateRestoreCallback stateRes
         assertTrue(stateStore2.initialized);
     }
 
-    private ProcessorStateManager getStandByStateManager(TaskId taskId) throws 
IOException {
+    private ProcessorStateManager getStandByStateManager(final TaskId taskId) 
throws IOException {
         return new ProcessorStateManager(
             taskId,
             noPartitions,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE when reinitializeStateStores with eos enabled
> -------------------------------------------------
>
>                 Key: KAFKA-6860
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6860
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>         Environment: mac, kafka1.1
>            Reporter: ko byoung kwon
>            Assignee: Matthias J. Sax
>            Priority: Major
>             Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
>         at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=30000,cleanup.policy=deleteĀ 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map<String, String> topicConfiguration = new HashMap<>();
>         topicConfiguration.putIfAbsent("cleanup.policy", "delete");
>         topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
>         topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>                Consumed.with(Serdes.Long(), Serdes.String()))
>        .groupByKey()
>        .count(Materialized
>                   .<Long, Long, KeyValueStore<Bytes, 
> byte[]>>as(ORDER_STORE_NAME)
>                   .withKeySerde(Serdes.Long())
>                   .withValueSerde(Serdes.Long())
>                   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
>     checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
>     checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
>     log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to