viirya commented on code in PR #53415:
URL: https://github.com/apache/spark/pull/53415#discussion_r2767789213
##########
python/pyspark/testing/sqlutils.py:
##########
@@ -57,12 +57,109 @@ def search_jar(project_relative_path, sbt_jar_name_prefix,
mvn_jar_name_prefix):
if not jars:
return None
- elif len(jars) > 1:
+ elif len(jars) > 1 and not return_first:
raise RuntimeError("Found multiple JARs: %s; please remove all but
one" % (", ".join(jars)))
else:
return jars[0]
+def get_sbt_runtime_classpath(project_relative_path, project_name_map):
+ """
+ Get the runtime classpath for a project using SBT.
+ This is used as a fallback when classpath.txt doesn't exist (SBT builds).
+ Args:
+ project_relative_path: Relative path from SPARK_HOME to the project
directory
+ project_name_map: Dict mapping project paths to SBT project names
+ (e.g., {"connector/kafka-0-10-sql": "sql-kafka-0-10"})
+ Returns:
+ Comma-separated string of JAR paths, or None if SBT command fails
+ """
+ import subprocess
+ import re
+
+ sbt_project = project_name_map.get(project_relative_path)
+ if not sbt_project:
+ return None
+
+ try:
+ # Run SBT command to get runtime classpath
+ sbt_cmd = os.path.join(SPARK_HOME, "build", "sbt")
+ result = subprocess.run(
+ [sbt_cmd, f"show {sbt_project}/Runtime/dependencyClasspath"],
+ cwd=SPARK_HOME,
+ capture_output=True,
+ text=True,
+ timeout=180,
+ )
+
+ if result.returncode != 0:
+ return None
+
+ # Parse the output to extract JAR paths
+ # Look for lines like: [info] * Attributed(/path/to/file.jar)
+ jar_paths = []
+ for line in result.stdout.splitlines():
+ match = re.search(r"Attributed\(([^)]+\.jar)\)", line)
Review Comment:
Hmm, is this reliable? Will it change from SBT versions?
##########
python/pyspark/sql/tests/streaming/kafka_utils.py:
##########
@@ -0,0 +1,371 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utilities for running PySpark tests against a Kafka cluster using Docker
containers.
+
+This module provides KafkaUtils class that launches a single-broker Kafka
cluster
+via Docker using testcontainers-python library. It's designed to be used with
+Python unittest-based PySpark tests.
+
+Example usage:
+ class MyKafkaTest(ReusedSQLTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def test_kafka_streaming(self):
+ topic = "test-topic"
+ self.kafka_utils.create_topics([topic])
+ # ... use self.kafka_utils.broker for bootstrap.servers
+"""
+
+import time
+from typing import List, Dict, Callable, Any
+
+
+class KafkaUtils:
+ """
+ Utility class for managing a Kafka test cluster using Docker containers.
+
+ This class provides methods to:
+ - Start/stop a single-broker Kafka cluster in a Docker container
+ - Create and delete topics
+ - Send messages to topics
+ - Query topic data using Spark
+ - Helper methods for testing (assert_eventually, wait_for_query_alive)
+
+ Attributes:
+ broker (str): The bootstrap server address (e.g., "localhost:9093")
+ initialized (bool): Whether the Kafka cluster has been started
+ """
+
+ def __init__(self, kafka_version: str = "7.4.0"):
+ """
+ Initialize KafkaUtils.
+
+ Args:
+ kafka_version: Version of Confluent Kafka to use (default: 7.4.0
for stability)
+ """
+ self.kafka_version = kafka_version
+ self.initialized = False
+ self._kafka_container = None
+ self._admin_client = None
+ self._producer = None
+ self.broker = None
+
+ def setup(self) -> None:
+ """
+ Start the Kafka container and initialize admin client and producer.
+
+ This method:
+ 1. Starts a Kafka container using testcontainers
+ 2. Creates an admin client for topic management
+ 3. Creates a producer for sending test messages
+
+ Raises:
+ ImportError: If required dependencies (testcontainers,
kafka-python) are not installed
+ RuntimeError: If Kafka container fails to start
+ """
+ if self.initialized:
+ return
+
+ try:
+ from testcontainers.kafka import KafkaContainer
+ except ImportError as e:
+ raise ImportError(
+ "testcontainers is required for Kafka tests. "
+ "Install it with: pip install testcontainers[kafka]"
+ ) from e
+
+ try:
+ from kafka import KafkaProducer
+ from kafka.admin import KafkaAdminClient
+ except ImportError as e:
+ raise ImportError(
+ "kafka-python is required for Kafka tests. "
+ "Install it with: pip install kafka-python"
+ ) from e
+
+ # Start Kafka container with specific version for test stability
+ self._kafka_container =
KafkaContainer(f"confluentinc/cp-kafka:{self.kafka_version}")
+ self._kafka_container.start()
+
+ # Get bootstrap server address
+ self.broker = self._kafka_container.get_bootstrap_server()
+
+ # Initialize admin client for topic management
+ self._admin_client = KafkaAdminClient(
+ bootstrap_servers=self.broker,
+ request_timeout_ms=10000,
+ api_version_auto_timeout_ms=10000,
+ )
+
+ # Initialize producer for sending test messages
+ self._producer = KafkaProducer(
+ bootstrap_servers=self.broker,
+ key_serializer=lambda k: str(k).encode("utf-8") if k is not None
else None,
+ value_serializer=lambda v: str(v).encode("utf-8") if v is not None
else None,
+ request_timeout_ms=10000,
+ max_block_ms=10000,
+ )
+
+ self.initialized = True
+
+ def teardown(self) -> None:
+ """
+ Stop the Kafka container and clean up resources.
+
+ This method closes the admin client, producer, and stops the Kafka
container.
+ It's safe to call multiple times.
+ """
+ if not self.initialized:
+ return
+
+ # Close admin client
+ if self._admin_client is not None:
+ try:
+ self._admin_client.close()
+ except Exception:
+ pass
+ self._admin_client = None
+
+ # Close producer
+ if self._producer is not None:
+ try:
+ self._producer.close(timeout=5)
+ except Exception:
+ pass
+ self._producer = None
+
+ # Stop Kafka container
+ if self._kafka_container is not None:
+ try:
+ self._kafka_container.stop()
+ except Exception:
+ pass
+ self._kafka_container = None
+
+ self.broker = None
+ self.initialized = False
+
+ def _assert_initialized(self) -> None:
+ """Check if KafkaUtils has been initialized, raise error if not."""
+ if not self.initialized:
+ raise RuntimeError("KafkaUtils has not been initialized. Call
setup() first.")
+
+ def create_topics(
+ self, topic_names: List[str], num_partitions: int = 1,
replication_factor: int = 1
+ ) -> None:
+ """
+ Create Kafka topics.
+
+ Args:
+ topic_names: List of topic names to create
+ num_partitions: Number of partitions per topic (default: 1)
+ replication_factor: Replication factor (default: 1, max: 1 for
single broker)
+
+ Note:
+ If a topic already exists, it will be silently ignored.
+ """
+ self._assert_initialized()
+
+ from kafka.admin import NewTopic
+ from kafka.errors import TopicAlreadyExistsError
+
+ topics = [
+ NewTopic(
+ name=name, num_partitions=num_partitions,
replication_factor=replication_factor
+ )
+ for name in topic_names
+ ]
+
+ try:
+ self._admin_client.create_topics(new_topics=topics,
validate_only=False)
+ except TopicAlreadyExistsError:
+ # Topic already exists, ignore
+ pass
+
+ def delete_topics(self, topic_names: List[str]) -> None:
+ """
+ Delete Kafka topics.
+
+ Args:
+ topic_names: List of topic names to delete
+
+ Note:
+ If a topic doesn't exist, it will be silently ignored.
+ """
+ self._assert_initialized()
+
+ from kafka.errors import UnknownTopicOrPartitionError
+
+ try:
+ self._admin_client.delete_topics(topics=topic_names)
+ except UnknownTopicOrPartitionError:
+ # Topic doesn't exist, ignore
+ pass
+
+ def send_messages(self, topic: str, messages: List[tuple]) -> None:
+ """
+ Send messages to a Kafka topic.
+
+ Args:
+ topic: Topic name to send messages to
+ messages: List of (key, value) tuples to send
+
+ Example:
+ kafka_utils.send_messages("test-topic", [
+ ("key1", "value1"),
+ ("key2", "value2"),
+ ])
+ """
+ self._assert_initialized()
+
+ for key, value in messages:
+ future = self._producer.send(topic, key=key, value=value)
+ future.get(timeout=10) # Wait for send to complete
Review Comment:
So each message waits up to 10 seconds before timeout? Maybe batch
optimization by collecting all futures and just wait for all messages at once
with a single timeout?
##########
python/pyspark/sql/tests/streaming/kafka_utils.py:
##########
@@ -0,0 +1,371 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utilities for running PySpark tests against a Kafka cluster using Docker
containers.
+
+This module provides KafkaUtils class that launches a single-broker Kafka
cluster
+via Docker using testcontainers-python library. It's designed to be used with
+Python unittest-based PySpark tests.
+
+Example usage:
+ class MyKafkaTest(ReusedSQLTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def test_kafka_streaming(self):
+ topic = "test-topic"
+ self.kafka_utils.create_topics([topic])
+ # ... use self.kafka_utils.broker for bootstrap.servers
+"""
+
+import time
+from typing import List, Dict, Callable, Any
+
+
+class KafkaUtils:
+ """
+ Utility class for managing a Kafka test cluster using Docker containers.
+
+ This class provides methods to:
+ - Start/stop a single-broker Kafka cluster in a Docker container
+ - Create and delete topics
+ - Send messages to topics
+ - Query topic data using Spark
+ - Helper methods for testing (assert_eventually, wait_for_query_alive)
+
+ Attributes:
+ broker (str): The bootstrap server address (e.g., "localhost:9093")
+ initialized (bool): Whether the Kafka cluster has been started
+ """
+
+ def __init__(self, kafka_version: str = "7.4.0"):
+ """
+ Initialize KafkaUtils.
+
+ Args:
+ kafka_version: Version of Confluent Kafka to use (default: 7.4.0
for stability)
+ """
+ self.kafka_version = kafka_version
+ self.initialized = False
+ self._kafka_container = None
+ self._admin_client = None
+ self._producer = None
+ self.broker = None
+
+ def setup(self) -> None:
+ """
+ Start the Kafka container and initialize admin client and producer.
+
+ This method:
+ 1. Starts a Kafka container using testcontainers
+ 2. Creates an admin client for topic management
+ 3. Creates a producer for sending test messages
+
+ Raises:
+ ImportError: If required dependencies (testcontainers,
kafka-python) are not installed
+ RuntimeError: If Kafka container fails to start
+ """
+ if self.initialized:
+ return
+
+ try:
+ from testcontainers.kafka import KafkaContainer
+ except ImportError as e:
+ raise ImportError(
+ "testcontainers is required for Kafka tests. "
+ "Install it with: pip install testcontainers[kafka]"
+ ) from e
+
+ try:
+ from kafka import KafkaProducer
+ from kafka.admin import KafkaAdminClient
+ except ImportError as e:
+ raise ImportError(
+ "kafka-python is required for Kafka tests. "
+ "Install it with: pip install kafka-python"
+ ) from e
+
+ # Start Kafka container with specific version for test stability
+ self._kafka_container =
KafkaContainer(f"confluentinc/cp-kafka:{self.kafka_version}")
+ self._kafka_container.start()
+
+ # Get bootstrap server address
+ self.broker = self._kafka_container.get_bootstrap_server()
+
+ # Initialize admin client for topic management
+ self._admin_client = KafkaAdminClient(
+ bootstrap_servers=self.broker,
+ request_timeout_ms=10000,
+ api_version_auto_timeout_ms=10000,
+ )
+
+ # Initialize producer for sending test messages
+ self._producer = KafkaProducer(
+ bootstrap_servers=self.broker,
+ key_serializer=lambda k: str(k).encode("utf-8") if k is not None
else None,
+ value_serializer=lambda v: str(v).encode("utf-8") if v is not None
else None,
+ request_timeout_ms=10000,
+ max_block_ms=10000,
+ )
+
+ self.initialized = True
+
+ def teardown(self) -> None:
+ """
+ Stop the Kafka container and clean up resources.
+
+ This method closes the admin client, producer, and stops the Kafka
container.
+ It's safe to call multiple times.
+ """
+ if not self.initialized:
+ return
+
+ # Close admin client
+ if self._admin_client is not None:
+ try:
+ self._admin_client.close()
+ except Exception:
+ pass
+ self._admin_client = None
+
+ # Close producer
+ if self._producer is not None:
+ try:
+ self._producer.close(timeout=5)
+ except Exception:
+ pass
+ self._producer = None
+
+ # Stop Kafka container
+ if self._kafka_container is not None:
+ try:
+ self._kafka_container.stop()
+ except Exception:
+ pass
+ self._kafka_container = None
+
+ self.broker = None
+ self.initialized = False
+
+ def _assert_initialized(self) -> None:
+ """Check if KafkaUtils has been initialized, raise error if not."""
+ if not self.initialized:
+ raise RuntimeError("KafkaUtils has not been initialized. Call
setup() first.")
+
+ def create_topics(
+ self, topic_names: List[str], num_partitions: int = 1,
replication_factor: int = 1
+ ) -> None:
+ """
+ Create Kafka topics.
+
+ Args:
+ topic_names: List of topic names to create
+ num_partitions: Number of partitions per topic (default: 1)
+ replication_factor: Replication factor (default: 1, max: 1 for
single broker)
+
+ Note:
+ If a topic already exists, it will be silently ignored.
Review Comment:
Is it intentional to ignore the error? Should we do logging or fail?
##########
python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py:
##########
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PySpark tests for Kafka streaming integration using Docker test containers.
+
+These tests demonstrate how to use KafkaUtils to test Spark streaming with
Kafka.
+Tests require Docker to be running and the following Python packages:
+- testcontainers[kafka]
+- kafka-python
+"""
+
+import os
+import tempfile
+import unittest
+import uuid
+
+from pyspark.testing.sqlutils import ReusedSQLTestCase, search_jar,
read_classpath
+
+# Setup Kafka JAR on classpath before SparkSession is created
+# This follows the same pattern as streamingutils.py for Kinesis
+kafka_sql_jar = search_jar(
+ "connector/kafka-0-10-sql",
+ "spark-sql-kafka-0-10_",
+ "spark-sql-kafka-0-10_",
+ return_first=True,
+)
+
+if kafka_sql_jar is None:
+ raise RuntimeError(
+ "Kafka SQL connector JAR was not found. "
+ "To run these tests, you need to build Spark with "
+ "'build/mvn package' or 'build/sbt Test/package' "
+ "before running this test."
+ )
+
+# Read the full classpath including all dependencies
+# This works for both Maven builds (reads classpath.txt) and SBT builds
(queries SBT)
+# Define the project name mapping for SBT builds
+kafka_project_name_map = {
+ "connector/kafka-0-10-sql": "sql-kafka-0-10",
+ "connector/kafka-0-10": "kafka-0-10",
+}
+kafka_classpath = read_classpath("connector/kafka-0-10-sql",
kafka_project_name_map)
+all_jars = f"{kafka_sql_jar},{kafka_classpath}"
+
+# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created
+existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+jars_args = "--jars %s" % all_jars
+
+os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
+
+from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils
+
+
+# Check if required Python dependencies are available
+try:
+ import testcontainers # noqa: F401
+ import kafka # noqa: F401
+except ImportError as e:
+ raise ImportError(
+ "Kafka test dependencies not available. "
+ "Install with: pip install testcontainers[kafka] kafka-python"
+ ) from e
+
+
+class StreamingKafkaTestsMixin:
+ """
+ Base mixin for Kafka streaming tests that provides KafkaUtils
setup/teardown
+ and topic management.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ # Start Kafka container - this may take 10-30 seconds on first run
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+ @classmethod
+ def tearDownClass(cls):
+ # Stop Kafka container and clean up resources
+ if hasattr(cls, "kafka_utils"):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def setUp(self):
+ super().setUp()
+ # Create unique topics for each test to avoid interference
+ self.source_topic = f"source-{uuid.uuid4().hex}"
+ self.sink_topic = f"sink-{uuid.uuid4().hex}"
+ self.kafka_utils.create_topics([self.source_topic, self.sink_topic])
+
+ def tearDown(self):
+ # Clean up topics after each test
+ self.kafka_utils.delete_topics([self.source_topic, self.sink_topic])
+ super().tearDown()
+
+
+class StreamingKafkaTests(StreamingKafkaTestsMixin, ReusedSQLTestCase):
+ """
+ Tests for Kafka streaming integration with PySpark.
+ """
+
+ def test_streaming_stateless(self):
+ """
+ Test stateless rtm query with earliest offset.
+ """
+
+ # produce test data to source_topic
+ for i in range(10):
+ self.kafka_utils.producer.send(
+ self.source_topic,
+ key=i,
+ value=i,
+ ).get(timeout=10)
+ self.kafka_utils.producer.flush()
+
+ # Build streaming query for Kafka to Kafka.
+ kafka_source = (
+ self.spark.readStream.format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("subscribe", self.source_topic)
+ .option("startingOffsets", "earliest")
+ .load()
+ )
+
+ checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint")
Review Comment:
Should we clean up the temp directory after test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]