mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460302576



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##########
@@ -234,24 +234,18 @@ void initialize() {
         }
 
         void pollAndUpdate() {
-            try {
-                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollTime);
-                for (final ConsumerRecord<byte[], byte[]> record : received) {
-                    stateMaintainer.update(record);
-                }
-                final long now = time.milliseconds();
-                if (now >= lastFlush + flushInterval) {
-                    stateMaintainer.flushState();
-                    lastFlush = now;
-                }
-            } catch (final InvalidOffsetException recoverableException) {

Review comment:
       We just let the original exception bubble up, to be able to wipe out the 
store. -- This is also just a side "improvement"; we could also just die and 
let users cleanup the state directory manually. However, it seems better to 
wipe it out directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to