Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20382#discussion_r168138470
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType,
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with
BeforeAndAfterEach {
+
+ override def afterEach() {
+ sqlContext.streams.active.foreach(_.stop())
+ if (serverThread != null) {
+ serverThread.interrupt()
+ serverThread.join()
+ serverThread = null
+ }
+ if (batchReader != null) {
+ batchReader.stop()
+ batchReader = null
+ }
+ }
+
+ private var serverThread: ServerThread = null
+ private var batchReader: MicroBatchReader = null
+
+ test("V2 basic usage") {
--- End diff --
These updated tests are getting more complicated with the direct calling of
low-level data source APIs. Can you convert these tests to the more highlevel
tests like Kafka?
Well if it gets too complicated to make it work with `testStream` then you
can simply use `query.processAllAvailable`. Then we wont have to worry about
changing APIs any more.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]