[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531936



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
  */
 void completeRestoration();
 
-void addRecords(TopicPartition partition, Iterable> records);
-
-boolean commitNeeded();
-
-/**
- * @throws StreamsException fatal error, should close the thread
- */
-Map prepareCommit();
-
-void postCommit();
-
 void suspend();
 
 /**
- *
  * @throws StreamsException fatal error, should close the thread
  */
 void resume();
 
-/**
- * Must be idempotent.
- */
+void closeDirty();
+
 void closeClean();
 
-/**
- * Must be idempotent.
- */
-void closeDirty();
+
+// non-idempotent life-cycle methods
 
 /**
- * Updates input partitions and topology after rebalance
+ * Revive a closed task to a created one; should never throw an exception
  */
-void update(final Set topicPartitions, final Map> nodeToSourceTopics);
+void revive();
 
 /**
  * Attempt a clean close but do not close the underlying state
  */
 void closeAndRecycleState();
 
-/**
- * Revive a closed task to a created one; should never throw an exception
- */
-void revive();
-
-StateStore getStore(final String name);
-
-Set inputPartitions();
+void markChangelogAsCorrupted(final Collection partitions);
 
-/**
- * @return any changelog partitions associated with this task
- */
-Collection changelogPartitions();
 
-/**
- * @return the offsets of all the changelog partitions associated with 
this task,
- * indicating the current positions of the logged state stores of 
the task.
- */
-Map changelogOffsets();
+// runtime methods (using in RUNNING state)
 
-void markChangelogAsCorrupted(final Collection partitions);
+void addRecords(TopicPartition partition, Iterable> records);
 
-default Map purgeableOffsets() {
-return Collections.emptyMap();
+default boolean process(final long wallClockTime) {
+return false;
 }
 
 default void recordProcessBatchTime(final long processBatchTime) {}
 
 default void recordProcessTimeRatioAndBufferSize(final long 
allTaskProcessMs, final long now) {}
 
-default boolean process(final long wallClockTime) {
+default boolean maybePunctuateStreamTime() {
 return false;
 }
 
-default boolean commitRequested() {
+default boolean maybePunctuateSystemTime() {
 return false;
 }
 
-default boolean maybePunctuateStreamTime() {
+boolean commitNeeded();
+
+default boolean commitRequested() {
 return false;
 }
 
-default boolean maybePunctuateSystemTime() {
-return false;
+/**
+ * @throws StreamsException fatal error, should close the thread
+ */
+Map prepareCommit();
+
+void postCommit();
+
+default Map purgeableOffsets() {
+return Collections.emptyMap();
 }
 
+
+// task status inquiry
+
+TaskId id();
+
+State state();
+
+boolean isActive();
+
+/**
+ * Updates input partitions after a rebalance
+ */
+void updateInputPartitions(final Set topicPartitions, 
final Map> nodeToSourceTopics);

Review comment:
   Renamed from `update` to `updateInputPartitions`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531698



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();
 
-State state();
 
-boolean isActive();
-
-boolean isClosed();

Review comment:
   This method is unused and removed. That is the only actual change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531571



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();

Review comment:
   Group and order methods (compare in-line comments). -- Sub-classed 
inherit this ordering.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;

Review comment:
   Align members to constructor parameter order, and group `final` / 
mutable.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;
+
 AbstractTask(final TaskId id,
  final ProcessorTopology topology,
  final StateDirectory stateDirectory,
  final ProcessorStateManager stateMgr,
  final Set inputPartitions) {
 this.id = id;
-this.stateMgr = stateMgr;
 this.topology = topology;
-this.inputPartitions = inputPartitions;
+this.stateMgr = stateMgr;
 this.stateDirectory = stateDirectory;
+this.inputPartitions = inputPartitions;

Review comment:
   Align assignment to parameter order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org