cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r787544943



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -515,11 +518,23 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
                 throw new MissingSourceTopicException("Missing source 
topics.");
+            } else {
+                for (final Map.Entry<String, Set<String>> topology : 
repartitionTopics.missingUserInputTopicsPerTopology().entrySet()) {
+                    final String topologyName = topology.getKey();
+                    final StreamsException exception = new StreamsException(
+                        new MissingSourceTopicException(String.format(
+                            "Missing source topics %s for topology %s",
+                            topology.getValue(),
+                            topologyName)),
+                        getDummyTaskIdForTopology(topologyName));

Review comment:
       Why do you need this dummy task ID?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -67,6 +67,9 @@
 
     private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
+    // Handler for recoverable StreamsExceptions which don't require 
killing/replacing the thread
+    private java.util.function.Consumer<Throwable> 
recoverableStreamsExceptionHandler;
+

Review comment:
       This is not used anywhere.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -600,9 +614,18 @@ public void 
shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInpu
             topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
             topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
 
+            assertThat(handler.nextError(TOPOLOGY_2), nullValue());
+
             streams.addNamedTopology(topology2Builder.build());
             streams2.addNamedTopology(topology2Builder2.build());
 
+            // verify that the missing source topics were noticed and the 
handler invoked
+            retryOnExceptionWithTimeout(() -> {
+                final Throwable error = handler.nextError(TOPOLOGY_2);
+                assertThat(error, notNullValue());
+                assertThat(error.getCause().getClass(), 
is(MissingSourceTopicException.class));
+            });
+

Review comment:
       Could you also verify that the stream thread was not replaced? You could 
use `KafkaStreams#metadataForLocalThreads()` for that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to