This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ad2367c55aeb [MINOR][PYTHON][SS][TESTS] Drop the tables after being 
used at `test_streaming_foreach_batch`
ad2367c55aeb is described below

commit ad2367c55aebf417183eda13e56c55364276f145
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Mon Apr 8 11:00:10 2024 +0900

    [MINOR][PYTHON][SS][TESTS] Drop the tables after being used at 
`test_streaming_foreach_batch`
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to drop the tables after tests finished.
    
    ### Why are the changes needed?
    
    - To clean up resources properly.
    - It can affect other test cases when only one session is being used across 
other tests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only.
    
    ### How was this patch tested?
    
    Tested in https://github.com/apache/spark/pull/45870
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45920 from HyukjinKwon/minor-cleanup-table.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../streaming/test_streaming_foreach_batch.py      | 140 +++++++++++----------
 1 file changed, 72 insertions(+), 68 deletions(-)

diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py 
b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
index 5d2c1bbbf62c..ef286115a303 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
@@ -97,46 +97,48 @@ class StreamingTestsForeachBatchMixin:
 
     def test_streaming_foreach_batch_spark_session(self):
         table_name = "testTable_foreach_batch"
+        with self.table(table_name):
 
-        def func(df: DataFrame, batch_id: int):
-            if batch_id > 0:  # only process once
-                return
-            spark = df.sparkSession
-            df1 = spark.createDataFrame([("structured",), ("streaming",)])
-            df1.union(df).write.mode("append").saveAsTable(table_name)
+            def func(df: DataFrame, batch_id: int):
+                if batch_id > 0:  # only process once
+                    return
+                spark = df.sparkSession
+                df1 = spark.createDataFrame([("structured",), ("streaming",)])
+                df1.union(df).write.mode("append").saveAsTable(table_name)
 
-        df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
-        q = df.writeStream.foreachBatch(func).start()
-        q.processAllAvailable()
-        q.stop()
+            df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+            q = df.writeStream.foreachBatch(func).start()
+            q.processAllAvailable()
+            q.stop()
 
-        actual = self.spark.read.table(table_name)
-        df = (
-            self.spark.read.format("text")
-            .load(path="python/test_support/sql/streaming/")
-            .union(self.spark.createDataFrame([("structured",), 
("streaming",)]))
-        )
-        self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
+            actual = self.spark.read.table(table_name)
+            df = (
+                self.spark.read.format("text")
+                .load(path="python/test_support/sql/streaming/")
+                .union(self.spark.createDataFrame([("structured",), 
("streaming",)]))
+            )
+            self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
 
     def test_streaming_foreach_batch_path_access(self):
         table_name = "testTable_foreach_batch_path"
+        with self.table(table_name):
 
-        def func(df: DataFrame, batch_id: int):
-            if batch_id > 0:  # only process once
-                return
-            spark = df.sparkSession
-            df1 = 
spark.read.format("text").load("python/test_support/sql/streaming")
-            df1.union(df).write.mode("append").saveAsTable(table_name)
+            def func(df: DataFrame, batch_id: int):
+                if batch_id > 0:  # only process once
+                    return
+                spark = df.sparkSession
+                df1 = 
spark.read.format("text").load("python/test_support/sql/streaming")
+                df1.union(df).write.mode("append").saveAsTable(table_name)
 
-        df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
-        q = df.writeStream.foreachBatch(func).start()
-        q.processAllAvailable()
-        q.stop()
+            df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+            q = df.writeStream.foreachBatch(func).start()
+            q.processAllAvailable()
+            q.stop()
 
-        actual = self.spark.read.table(table_name)
-        df = 
self.spark.read.format("text").load(path="python/test_support/sql/streaming/")
-        df = df.union(df)
-        self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
+            actual = self.spark.read.table(table_name)
+            df = 
self.spark.read.format("text").load(path="python/test_support/sql/streaming/")
+            df = df.union(df)
+            self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
 
     @staticmethod
     def my_test_function_2():
@@ -147,56 +149,58 @@ class StreamingTestsForeachBatchMixin:
             return 3
 
         table_name = "testTable_foreach_batch_function"
+        with self.table(table_name):
+
+            def func(df: DataFrame, batch_id: int):
+                if batch_id > 0:  # only process once
+                    return
+                spark = df.sparkSession
+                df1 = spark.createDataFrame(
+                    [
+                        (my_test_function_1(),),
+                        
(StreamingTestsForeachBatchMixin.my_test_function_2(),),
+                        (my_test_function_3(),),
+                    ]
+                )
+                df1.write.mode("append").saveAsTable(table_name)
+
+            df = self.spark.readStream.format("rate").load()
+            q = df.writeStream.foreachBatch(func).start()
+            q.processAllAvailable()
+            q.stop()
 
-        def func(df: DataFrame, batch_id: int):
-            if batch_id > 0:  # only process once
-                return
-            spark = df.sparkSession
-            df1 = spark.createDataFrame(
+            actual = self.spark.read.table(table_name)
+            df = self.spark.createDataFrame(
                 [
                     (my_test_function_1(),),
                     (StreamingTestsForeachBatchMixin.my_test_function_2(),),
                     (my_test_function_3(),),
                 ]
             )
-            df1.write.mode("append").saveAsTable(table_name)
-
-        df = self.spark.readStream.format("rate").load()
-        q = df.writeStream.foreachBatch(func).start()
-        q.processAllAvailable()
-        q.stop()
-
-        actual = self.spark.read.table(table_name)
-        df = self.spark.createDataFrame(
-            [
-                (my_test_function_1(),),
-                (StreamingTestsForeachBatchMixin.my_test_function_2(),),
-                (my_test_function_3(),),
-            ]
-        )
-        self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
+            self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
 
     def test_streaming_foreach_batch_import(self):
         import time  # not imported in foreach_batch_worker
 
         table_name = "testTable_foreach_batch_import"
+        with self.table(table_name):
+
+            def func(df: DataFrame, batch_id: int):
+                if batch_id > 0:  # only process once
+                    return
+                time.sleep(1)
+                spark = df.sparkSession
+                df1 = 
spark.read.format("text").load("python/test_support/sql/streaming")
+                df1.write.mode("append").saveAsTable(table_name)
+
+            df = self.spark.readStream.format("rate").load()
+            q = df.writeStream.foreachBatch(func).start()
+            q.processAllAvailable()
+            q.stop()
 
-        def func(df: DataFrame, batch_id: int):
-            if batch_id > 0:  # only process once
-                return
-            time.sleep(1)
-            spark = df.sparkSession
-            df1 = 
spark.read.format("text").load("python/test_support/sql/streaming")
-            df1.write.mode("append").saveAsTable(table_name)
-
-        df = self.spark.readStream.format("rate").load()
-        q = df.writeStream.foreachBatch(func).start()
-        q.processAllAvailable()
-        q.stop()
-
-        actual = self.spark.read.table(table_name)
-        df = 
self.spark.read.format("text").load("python/test_support/sql/streaming")
-        self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
+            actual = self.spark.read.table(table_name)
+            df = 
self.spark.read.format("text").load("python/test_support/sql/streaming")
+            self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
 
 
 class StreamingTestsForeachBatch(StreamingTestsForeachBatchMixin, 
ReusedSQLTestCase):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to