mjsax commented on a change in pull request #9064: URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717
########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ########## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. Review comment: Nit: insert `<p>` tag to actually get the new paragraph rendered. Nit: `Topology -> `{@link Topology}` It's not really clear what "deterministic" means. We should elaborate more. ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ########## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. + * If different instances build different runtime code logic the resulting behavior may be unexpected. Review comment: "different" for sure, but this implies that one might have an operator the other does not. The observed issue is, that even if both contain the same operator, they might be added in different order (and thus be named differently) to the `Topology`, thus we should stretch that order matters. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1984,6 +1985,24 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { task.initializeStateStores(); } + @Test(expected = TopologyException.class) Review comment: We should not use this annotation but rather use `assertThrows` (we still have some code that does not use `assertThrows` but we try to lazily migrate our tests, as it provides a better test pattern). ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -181,8 +182,16 @@ public StreamTask(final TaskId id, final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); + final Set<String> sourceTopics = topology.sourceTopics(); for (final TopicPartition partition : partitions) { - final SourceNode source = topology.source(partition.topic()); + final String topicName = partition.topic(); + if (!sourceTopics.contains(topicName)) { + throw new TopologyException( + "Topic not found " + topicName + ". Is the Streams Topology built in a deterministic way?" Review comment: `Topic not found` sounds like as-if the topic was not found in the cluster -- however, what actually happened is that we received a record but the record's topic is unknown in the sub-topology. Similar to above, "deterministic" is not really easy to understand. I would also not phrase it as a question, but as a statement: ``` ... This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order. ``` Or similar. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org