cadonna commented on a change in pull request #11752:
URL: https://github.com/apache/kafka/pull/11752#discussion_r807751843



##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -1094,29 +1095,37 @@ private void throwIfBuiltInStore(final StateStore 
stateStore) {
      * Close the driver, its topology, and all processors.
      */
     public void close() {
-        if (task != null) {
-            task.suspend();
-            task.prepareCommit();
-            task.postCommit(true);
-            task.closeClean();
-        }
-        if (globalStateTask != null) {
-            try {
-                globalStateTask.close(false);
-            } catch (final IOException e) {
-                // ignore
+        try {
+            if (task != null) {
+                task.suspend();
+                task.prepareCommit();
+                task.postCommit(true);
+                task.closeClean();
+            }
+            if (globalStateTask != null) {
+                try {
+                    globalStateTask.close(false);
+                } catch (final IOException e) {
+                    // ignore
+                }
+            }
+            completeAllProcessableWork();
+            if (task != null && task.hasRecordsQueued()) {
+                log.warn("Found some records that cannot be processed due to 
the" +
+                                " {} configuration during 
TopologyTestDriver#close().",
+                        StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+            }
+            if (processingMode == AT_LEAST_ONCE) {
+                producer.close();
+            }
+            stateDirectory.clean();
+        } catch (final RuntimeException rex) {
+            if (OperatingSystem.IS_WINDOWS) {
+                log.warn("Ignoring exception for windows, issue may be similar 
to resolved issue: https://issues.apache.org/jira/browse/KAFKA-6647";, rex);
+            } else {

Review comment:
       I think we can remove this since most of the tests that use the 
`TopologyTestDriver` use the `try-with-resource` statement which close the 
`TopologyTestDriver` without swallowing the exception, e.g., 
https://github.com/apache/kafka/blob/1c7cd7979228a97c5c5674f6527a813ae298fd2d/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java#L99.
 
   So we already did not swallow the exception for some tests. 
   
   I think it is enough to remove the swallowing from the tests and not change 
the `TopologyTestDriver`.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to