mjsax commented on a change in pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.

Review comment:
       Nit: insert `<p>` tag to actually get the new paragraph rendered.
   
   Nit: `Topology -> `{@link Topology}`
   
   It's not really clear what "deterministic" means. We should elaborate more.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.
+ * If different instances build different runtime code logic the resulting 
behavior may be unexpected.

Review comment:
       "different" for sure, but this implies that one might have an operator 
the other does not. The observed issue is, that even if both contain the same 
operator, they might be added in different order (and thus be named 
differently) to the `Topology`, thus we should stretch that order matters.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1984,6 +1985,24 @@ public void 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
         task.initializeStateStores();
     }
 
+    @Test(expected = TopologyException.class)

Review comment:
       We should not use this annotation but rather use `assertThrows` (we 
still have some code that does not use `assertThrows` but we try to lazily 
migrate our tests, as it provides a better test pattern).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -181,8 +182,16 @@ public StreamTask(final TaskId id,
 
         final TimestampExtractor defaultTimestampExtractor = 
config.defaultTimestampExtractor();
         final DeserializationExceptionHandler 
defaultDeserializationExceptionHandler = 
config.defaultDeserializationExceptionHandler();
+        final Set<String> sourceTopics = topology.sourceTopics();
         for (final TopicPartition partition : partitions) {
-            final SourceNode source = topology.source(partition.topic());
+            final String topicName = partition.topic();
+            if (!sourceTopics.contains(topicName)) {
+                throw new TopologyException(
+                        "Topic not found " + topicName + ". Is the Streams 
Topology built in a deterministic way?"

Review comment:
       `Topic not found` sounds like as-if the topic was not found in the 
cluster -- however, what actually happened is that we received a record but the 
record's topic is unknown in the sub-topology.
   
   Similar to above, "deterministic" is not really easy to understand. I would 
also not phrase it as a question, but as a statement:
   ```
   ... This may happen if different KafkaStreams instances of the same 
application execute different Topologies. Note that Topologies are only 
identical if all operators are added in the same order.
   ```
   
   Or similar.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to