Github user ScrapCodes commented on a diff in the pull request:
https://github.com/apache/spark/pull/22639#discussion_r229218791
--- Diff:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+ k8sSuite: KubernetesSuite =>
+
+ import StreamingCompatibilitySuite._
+
+ test("Run spark streaming in client mode.", k8sTestTag) {
+ val (host, port, serverSocket) = startSocketServer()
+ val driverService = driverServiceSetup
+ try {
+ setupSparkStreamingPod(driverService.getMetadata.getName)
+ .addToArgs("streaming.NetworkWordCount")
+ .addToArgs(host, port.toString)
+ .endContainer()
+ .endSpec()
+ .done()
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(getRunLog.contains("spark-streaming-kube"), "The
application did not complete.")
+ }
+ } finally {
+ // Have to delete the service manually since it doesn't have an
owner reference
+ kubernetesTestComponents
+ .kubernetesClient
+ .services()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .delete(driverService)
+ serverSocket.close()
+ }
+ }
+
+ test("Run spark streaming in cluster mode.", k8sTestTag) {
+ val (host, port, serverSocket) = startSocketServer()
+ try {
+ runSparkJVMCheckAndVerifyCompletion(
+ mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+ appArgs = Array[String](host, port.toString),
+ expectedJVMValue = Seq("spark-streaming-kube"))
+ } finally {
+ serverSocket.close()
+ }
+ }
+
+ test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+ val (host, port, serverSocket) = startSocketServer()
+ try {
+ runSparkJVMCheckAndVerifyCompletion(
+ mainClass =
"org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount",
+ appArgs = Array[String](host, port.toString),
+ expectedJVMValue = Seq("spark-streaming-kube"))
+ } finally {
+ serverSocket.close()
+ }
+ }
+
+ test("Run spark structured streaming in client mode.", k8sTestTag) {
+ val (host, port, serverSocket) = startSocketServer()
+ val driverService = driverServiceSetup
+ try {
+ setupSparkStreamingPod(driverService.getMetadata.getName)
+ .addToArgs("sql.streaming.StructuredNetworkWordCount")
+ .addToArgs(host, port.toString)
+ .endContainer()
+ .endSpec()
+ .done()
+
+ val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes))
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(getRunLog.contains("spark-streaming-kube"),
+ "The application did not complete.")
+ }
+ }
+ finally {
+ // Have to delete the service manually since it doesn't have an
owner reference
+ kubernetesTestComponents
+ .kubernetesClient
+ .services()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .delete(driverService)
+ serverSocket.close()
+ }
+ }
+
+ private def getRunLog: String = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ .getLog
+
+ private def setupSparkStreamingPod(driverServiceName: String) = {
+ val labels = Map("spark-app-selector" -> driverPodName)
+ testBackend
+ .getKubernetesClient
+ .pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .createNew()
+ .withNewMetadata()
+ .withName(driverPodName)
+ .withLabels(labels.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withServiceAccountName(kubernetesTestComponents.serviceAccountName)
--- End diff --
I have run these tests on my own setup of minikube, and I am unable to
reproduce the failure that occurred on jenkins. It is possible that is related
to how minikube is setup on jenkins.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]