[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511726#comment-16511726
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---

mjsax closed pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams with EOS 
enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index b270e03f2e0..66ddec950c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -36,17 +36,18 @@
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
 final File baseDir;
-final Map checkpointableOffsets = new HashMap<>();
-
+private final boolean eosEnabled;
 OffsetCheckpoint checkpoint;
 
+final Map checkpointableOffsets = new HashMap<>();
 final Map stores = new LinkedHashMap<>();
 final Map globalStores = new LinkedHashMap<>();
 
-AbstractStateManager(final File baseDir) {
+AbstractStateManager(final File baseDir,
+ final boolean eosEnabled) {
 this.baseDir = baseDir;
+this.eosEnabled = eosEnabled;
 this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-
 }
 
 public void reinitializeStateStoresForPartitions(final Logger log,
@@ -62,11 +63,14 @@ public void reinitializeStateStoresForPartitions(final 
Logger log,
 checkpointableOffsets.remove(topicPartition);
 
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
 }
-try {
-checkpoint.write(checkpointableOffsets);
-} catch (final IOException fatalException) {
-log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
-throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
+
+if (!eosEnabled) {
+try {
+checkpoint.write(checkpointableOffsets);
+} catch (final IOException fatalException) {
+log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
+throw new StreamsException("Failed to reinitialize global 
store.", fatalException);
+}
 }
 
 for (final Map.Entry entry : 
storesCopy.entrySet()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 79088d98806..78c4a363f29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -69,7 +69,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
-super(stateDirectory.globalStateDir());
+super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
 this.log = logContext.logger(GlobalStateManagerImpl.class);
 this.topology = topology;
@@ -92,16 +92,16 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext processorCo
 if (!stateDirectory.lockGlobalState()) {
 throw new LockException(String.format("Failed to lock the 
global state directory: %s", baseDir));
 }
-} catch (IOException e) {
+} catch (final IOException e) {
 throw new LockException(String.format("Failed to lock the global 
state directory: %s", baseDir));
 }
 
 try {
 this.checkpointableOffsets.putAll(checkpoint.read());
-} catch (IOException e) {
+} catch (final IOException e) {
 try {
 stateDirectory.unlockGlobalState();
-} catch (IOException e1) {
+} catch (final IOException e1) {
 log.error("Failed to unlock the global state directory", e);
 }
 throw new 

[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508424#comment-16508424
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---

mjsax opened a new pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams 
with EOS enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> 

[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508317#comment-16508317
 ] 

Guozhang Wang commented on KAFKA-6860:
--

I see. Thanks for the explanation [~mjsax]. And your proposed fix makes sense 
to me. I think a more general solution would involve also fixing the double 
checkpointing for non-EOS case: today we checkpoint in `suspend` if EOS is not 
turned in and in `closeSuspended` always. So for EOS, we only checkpoint in 
`closeSuspended`, while in non EOS we checkpoint in both, hence we have 
unnecessarily written twice of the checkpoints when closing. But for this 
general fix, I think it may be better to consider fixing with some refactoring 
on the ProcessorStateManager code, and hence not necessarily to be included for 
this JIRA.

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507604#comment-16507604
 ] 

Matthias J. Sax commented on KAFKA-6860:


[~guozhang] Note that `checkpoint` is not `final` and indeed, 
`ProcessorStateManager` sets `checkpoint = null` when EOS is enabled in it's 
own constructor. This is by design, because the checkpoint file is deleted to 
make sure we do full restore in case of error. Only in a clean shutdown, the 
checkpoint file is recreated.

Thus, a proper fix would be to check if EOS is enabled and only write the 
checkpoint file if disabled. Let me know what you think about this? If you 
agree that it is a proper fix, we can include it in 2.0 release. But we should 
not rush it – EOS is sensitive. Also, the describe scenario is a corner case 
that only happens is offsets get invalid for a changelog topic when we try to 
restore it.

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-01 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498733#comment-16498733
 ] 

Guozhang Wang commented on KAFKA-6860:
--

Thanks for the detailed explanation of the issue. I looked at the code of 1.1, 
and the `checkpoint` object is always initialized in the constructor of 
`ProcessorStateManager`. So it is still not clear to me why EOS would hit this 
issue. Could you share more of your findings?

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)