viirya commented on code in PR #53415: URL: https://github.com/apache/spark/pull/53415#discussion_r2771262817
########## python/pyspark/sql/tests/streaming/kafka_utils.py: ########## @@ -0,0 +1,371 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Utilities for running PySpark tests against a Kafka cluster using Docker containers. + +This module provides KafkaUtils class that launches a single-broker Kafka cluster +via Docker using testcontainers-python library. It's designed to be used with +Python unittest-based PySpark tests. + +Example usage: + class MyKafkaTest(ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.kafka_utils = KafkaUtils() + cls.kafka_utils.setup() + + @classmethod + def tearDownClass(cls): + cls.kafka_utils.teardown() + super().tearDownClass() + + def test_kafka_streaming(self): + topic = "test-topic" + self.kafka_utils.create_topics([topic]) + # ... use self.kafka_utils.broker for bootstrap.servers +""" + +import time +from typing import List, Dict, Callable, Any + + +class KafkaUtils: + """ + Utility class for managing a Kafka test cluster using Docker containers. + + This class provides methods to: + - Start/stop a single-broker Kafka cluster in a Docker container + - Create and delete topics + - Send messages to topics + - Query topic data using Spark + - Helper methods for testing (assert_eventually, wait_for_query_alive) + + Attributes: + broker (str): The bootstrap server address (e.g., "localhost:9093") + initialized (bool): Whether the Kafka cluster has been started + """ + + def __init__(self, kafka_version: str = "7.4.0"): + """ + Initialize KafkaUtils. + + Args: + kafka_version: Version of Confluent Kafka to use (default: 7.4.0 for stability) + """ + self.kafka_version = kafka_version + self.initialized = False + self._kafka_container = None + self._admin_client = None + self._producer = None + self.broker = None + + def setup(self) -> None: + """ + Start the Kafka container and initialize admin client and producer. + + This method: + 1. Starts a Kafka container using testcontainers + 2. Creates an admin client for topic management + 3. Creates a producer for sending test messages + + Raises: + ImportError: If required dependencies (testcontainers, kafka-python) are not installed + RuntimeError: If Kafka container fails to start + """ + if self.initialized: + return + + try: + from testcontainers.kafka import KafkaContainer + except ImportError as e: + raise ImportError( + "testcontainers is required for Kafka tests. " + "Install it with: pip install testcontainers[kafka]" + ) from e + + try: + from kafka import KafkaProducer + from kafka.admin import KafkaAdminClient + except ImportError as e: + raise ImportError( + "kafka-python is required for Kafka tests. " + "Install it with: pip install kafka-python" + ) from e + + # Start Kafka container with specific version for test stability + self._kafka_container = KafkaContainer(f"confluentinc/cp-kafka:{self.kafka_version}") + self._kafka_container.start() + + # Get bootstrap server address + self.broker = self._kafka_container.get_bootstrap_server() + + # Initialize admin client for topic management + self._admin_client = KafkaAdminClient( + bootstrap_servers=self.broker, + request_timeout_ms=10000, + api_version_auto_timeout_ms=10000, + ) + + # Initialize producer for sending test messages + self._producer = KafkaProducer( + bootstrap_servers=self.broker, + key_serializer=lambda k: str(k).encode("utf-8") if k is not None else None, + value_serializer=lambda v: str(v).encode("utf-8") if v is not None else None, + request_timeout_ms=10000, + max_block_ms=10000, + ) + + self.initialized = True + + def teardown(self) -> None: + """ + Stop the Kafka container and clean up resources. + + This method closes the admin client, producer, and stops the Kafka container. + It's safe to call multiple times. + """ + if not self.initialized: + return + + # Close admin client + if self._admin_client is not None: + try: + self._admin_client.close() + except Exception: + pass Review Comment: Can we add logging for exception case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
