Github user mccheah commented on a diff in the pull request:
https://github.com/apache/spark/pull/22639#discussion_r223518338
--- Diff:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+ k8sSuite: KubernetesSuite =>
+
+ private def startSocketServer(): (String, Int, ServerSocket) = {
+ val hostname = util.Utils.localHostName()
+ val hostAddress: String =
InetAddress.getByName(hostname).getHostAddress
+ val serverSocket = new ServerSocket()
+ serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+ val host = serverSocket.getInetAddress.getHostAddress
+ val port = serverSocket.getLocalPort
+ logInfo(s"Started test server socket at $host:$port")
+ Future {
+ while (!serverSocket.isClosed) {
+ val socket: Socket = serverSocket.accept()
+ logInfo(s"Received connection on $socket")
+ for (i <- 1 to 10 ) {
+ if (socket.isConnected && !serverSocket.isClosed) {
+ socket.getOutputStream.write("spark-streaming-kube
test.\n".getBytes())
--- End diff --
Specify encoding as UTF-8.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]