Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r160559942
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.time.SpanSugar._
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.{BinaryType, DataType}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a temporary port of KafkaSinkSuite, since we do not yet have a
V2 memory stream.
+ * Once we have one, this will be changed to a specialization of
KafkaSinkSuite and we won't have
+ * to duplicate all the code.
+ */
+class KafkaContinuousSinkSuite extends KafkaContinuousTest {
+ import testImplicits._
+
+ override val streamingTimeout = 30.seconds
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils(
+ withBrokerProps = Map("auto.create.topics.enable" -> "false"))
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
+
+ test("streaming - write to kafka with topic field") {
+ val inputTopic = newTopic()
+ testUtils.createTopic(inputTopic, partitions = 1)
+
+ val input = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", inputTopic)
+ .option("startingOffsets", "earliest")
+ .load()
+
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF(),
+ withTopic = None,
+ withOutputMode = Some(OutputMode.Append))(
+ withSelectExpr = s"'$topic' as topic", "value")
+
+ val reader = createKafkaReader(topic)
+ .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+ .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+ .as[(Int, Int)]
+ .map(_._2)
+
+ try {
+ testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+ testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ } finally {
+ writer.stop()
+ }
+ }
+
+ test("streaming - write data with bad schema") {
--- End diff --
missing tests for ."w/o topic field, with topic option" and "topic field
and topic option".
and also test for the case when topic field is null.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]