Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20382#discussion_r170759995
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType,
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with
BeforeAndAfterEach {
+
+ override def afterEach() {
+ sqlContext.streams.active.foreach(_.stop())
+ if (serverThread != null) {
+ serverThread.interrupt()
+ serverThread.join()
+ serverThread = null
+ }
+ if (batchReader != null) {
+ batchReader.stop()
+ batchReader = null
+ }
+ }
+
+ private var serverThread: ServerThread = null
+ private var batchReader: MicroBatchReader = null
+
+ test("V2 basic usage") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val provider = new TextSocketSourceProvider
+ val options = new DataSourceOptions(
+ Map("host" -> "localhost", "port" ->
serverThread.port.toString).asJava)
+ batchReader = provider.createMicroBatchReader(Optional.empty(), "",
options)
+
+ val schema = batchReader.readSchema()
+ assert(schema === StructType(StructField("value", StringType) :: Nil))
+
+ failAfter(streamingTimeout) {
+ serverThread.enqueue("hello")
+ batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+ while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset ==
-1L) {
+ batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+ Thread.sleep(10)
+ }
+ withSQLConf("spark.sql.streaming.unsupportedOperationCheck" ->
"false") {
+ val offset1 = batchReader.getEndOffset
+ val batch1 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
{ r =>
+ while (r.next()) {
+ batch1.append(r.get())
+ }
+ }
+ assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
+
+ serverThread.enqueue("world")
+ while (batchReader.getEndOffset === offset1) {
+ batchReader.setOffsetRange(Optional.of(offset1),
Optional.empty())
+ Thread.sleep(10)
+ }
+ val offset2 = batchReader.getEndOffset
+ val batch2 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
{ r =>
+ while (r.next()) {
+ batch2.append(r.get())
+ }
+ }
+ assert(batch2.map(_.getAs[String](0)) === Seq("world"))
+
+ batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
+ val both = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
{ r =>
+ while (r.next()) {
+ both.append(r.get())
+ }
+ }
+ assert(both.map(_.getAs[String](0)) === Seq("hello", "world"))
+ }
+
+ // Try stopping the source to make sure this does not block forever.
+ batchReader.stop()
+ batchReader = null
+ }
+ }
+
+ test("timestamped usage") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val provider = new TextSocketSourceProvider
+ val options = new DataSourceOptions(Map("host" -> "localhost",
+ "port" -> serverThread.port.toString, "includeTimestamp" ->
"true").asJava)
+ batchReader = provider.createMicroBatchReader(Optional.empty(), "",
options)
+
+ val schema = batchReader.readSchema()
+ assert(schema === StructType(StructField("value", StringType) ::
+ StructField("timestamp", TimestampType) :: Nil))
+
+ failAfter(streamingTimeout) {
+ serverThread.enqueue("hello")
+ batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+ while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset ==
-1L) {
+ batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+ Thread.sleep(10)
+ }
+ withSQLConf("spark.sql.streaming.unsupportedOperationCheck" ->
"false") {
+ val offset1 = batchReader.getEndOffset
+ val batch1 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
{ r =>
+ while (r.next()) {
+ batch1.append(r.get())
+ }
+ }
+ assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
+ val batch1Stamp = batch1.map(_.getAs[Timestamp](1)).head
+
+ serverThread.enqueue("world")
+ while (batchReader.getEndOffset === offset1) {
+ batchReader.setOffsetRange(Optional.of(offset1),
Optional.empty())
+ Thread.sleep(10)
+ }
+ val offset2 = batchReader.getEndOffset
+ val batch2 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
{ r =>
+ while (r.next()) {
+ batch2.append(r.get())
+ }
+ }
+ assert(batch2.map(_.getAs[String](0)) === Seq("world"))
+ val batch2Stamp = batch2.map(_.getAs[Timestamp](1)).head
+ assert(!batch2Stamp.before(batch1Stamp))
+ }
+
+ // Try stopping the source to make sure this does not block forever.
+ batchReader.stop()
+ batchReader = null
+ }
+ }
+
+ test("params not given") {
+ val provider = new TextSocketSourceProvider
+ intercept[AnalysisException] {
+ provider.createMicroBatchReader(Optional.empty(), "",
+ new DataSourceOptions(Map.empty[String, String].asJava))
+ }
+ intercept[AnalysisException] {
+ provider.createMicroBatchReader(Optional.empty(), "",
+ new DataSourceOptions(Map("host" -> "localhost").asJava))
+ }
+ intercept[AnalysisException] {
+ provider.createMicroBatchReader(Optional.empty(), "",
+ new DataSourceOptions(Map("port" -> "1234").asJava))
+ }
+ }
+
+ test("non-boolean includeTimestamp") {
+ val provider = new TextSocketSourceProvider
+ val params = Map("host" -> "localhost", "port" -> "1234",
"includeTimestamp" -> "fasle")
+ intercept[AnalysisException] {
+ val a = new DataSourceOptions(params.asJava)
+ provider.createMicroBatchReader(Optional.empty(), "", a)
+ }
+ }
+
+ test("user-specified schema given") {
+ val provider = new TextSocketSourceProvider
+ val userSpecifiedSchema = StructType(
+ StructField("name", StringType) ::
+ StructField("area", StringType) :: Nil)
+ val params = Map("host" -> "localhost", "port" -> "1234")
+ val exception = intercept[AnalysisException] {
+ provider.createMicroBatchReader(
+ Optional.of(userSpecifiedSchema), "", new
DataSourceOptions(params.asJava))
+ }
+ assert(exception.getMessage.contains(
+ "socket source does not support a user-specified schema"))
+ }
+
+ test("no server up") {
+ val provider = new TextSocketSourceProvider
+ val parameters = Map("host" -> "localhost", "port" -> "0")
+ intercept[IOException] {
+ batchReader = provider.createMicroBatchReader(
+ Optional.empty(), "", new DataSourceOptions(parameters.asJava))
+ }
+ }
+
--- End diff --
the test below seems good replacement to me.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]