Dynamic executor scaling spark/Kubernetes
Hello, Is Kubernetes Dynamic executor scaling for spark is available in latest release of spark I mean scaling the executors based on the work load vs preallocating number of executors for a spark job Thanks, Purna
Re: [ANNOUNCE] Announcing Apache Spark 2.4.0
Thanks this is a great news Can you please lemme if dynamic resource allocation is available in spark 2.4? I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor memory options as part of spark submit command or spark will manage required executor memory based on the spark job size ? On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin wrote: > +user@ > > >> -- Forwarded message - > >> From: Wenchen Fan > >> Date: Thu, Nov 8, 2018 at 10:55 PM > >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0 > >> To: Spark dev list > >> > >> > >> Hi all, > >> > >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release > adds Barrier Execution Mode for better integration with deep learning > frameworks, introduces 30+ built-in and higher-order functions to deal with > complex data type easier, improves the K8s integration, along with > experimental Scala 2.12 support. Other major updates include the built-in > Avro data source, Image data source, flexible streaming sinks, elimination > of the 2GB block size limitation during transfer, Pandas UDF improvements. > In addition, this release continues to focus on usability, stability, and > polish while resolving around 1100 tickets. > >> > >> We'd like to thank our contributors and users for their contributions > and early feedback to this release. This release would not have been > possible without you. > >> > >> To download Spark 2.4.0, head over to the download page: > http://spark.apache.org/downloads.html > >> > >> To view the release notes: > https://spark.apache.org/releases/spark-release-2-4-0.html > >> > >> Thanks, > >> Wenchen > >> > >> PS: If you see any issues with the release notes, webpage or published > artifacts, please contact me directly off-list. > > > > -- > Marcelo > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark 2.3.1: k8s driver pods stuck in Initializing state
Hello , We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found >From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Please suggest Thanks, Purna
Spark 2.3.1: k8s driver pods stuck in Initializing state
We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Thanks, Christopher Carney The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Resurfacing The question to get more attention Hello, > > im running Spark 2.3 job on kubernetes cluster >> >> kubectl version >> >> Client Version: version.Info{Major:"1", Minor:"9", >> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", >> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", >> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} >> >> Server Version: version.Info{Major:"1", Minor:"8", >> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", >> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", >> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} >> >> >> >> when i ran spark submit on k8s master the driverpod is stuck in Waiting: >> PodInitializing state. >> I had to manually kill the driver pod and submit new job in this case >> ,then it works.How this can be handled in production ? >> > This happens with executor pods as well > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > >> >> This is happening if i submit the jobs almost parallel ie submit 5 jobs >> one after the other simultaneously. >> >> I'm running spark jobs on 20 nodes each having below configuration >> >> I tried kubectl describe node on the node where trhe driver pod is >> running this is what i got ,i do see there is overcommit on resources but i >> expected kubernetes scheduler not to schedule if resources in node are >> overcommitted or node is in Not Ready state ,in this case node is in Ready >> State but i observe same behaviour if node is in "Not Ready" state >> >> >> >> Name: ** >> >> Roles: worker >> >> Labels: beta.kubernetes.io/arch=amd64 >> >> beta.kubernetes.io/os=linux >> >> kubernetes.io/hostname= >> >> node-role.kubernetes.io/worker=true >> >> Annotations:node.alpha.kubernetes.io/ttl=0 >> >> >> volumes.kubernetes.io/controller-managed-attach-detach=true >> >> Taints: >> >> CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 >> >> Conditions: >> >> Type Status LastHeartbeatTime >> LastTransitionTimeReason Message >> >> -- - >> ---- --- >> >> OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has >> sufficient disk space available >> >> MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has >> sufficient memory available >> >> DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk >> pressure >> >> ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 >> Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting >> ready status. AppArmor enabled >> >> Addresses: >> >> InternalIP: * >> >> Hostname:** >> >> Capacity: >> >> cpu: 16 >> >> memory: 125827288Ki >> >> pods:110 >> >> Allocatable: >> >> cpu: 16 >> >> memory: 125724888Ki >> >> pods:110 >> >> System Info: >> >> Machine ID: * >> >> System UUID:** >> >> Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f >> >> Kernel Version: 4.4.0-1062-aws >> >> OS Image: Ubuntu 16.04.4 LTS >> >> Operating System: linux >> >> Architecture: amd64 >> >> Container Runtime Version: docker://Unknown >> >> Kubelet Version:v1.8.3 >> >> Kube-Proxy Version: v1.8.3 >> >> PodCIDR: ** >> >> ExternalID: ** >> >> Non-terminated Pods: (11 in total) >> >> Namespace Name >>CPU Requests CPU Limits Memory Requests Memory >> Limits >> >> - >> -- --- >> - >> >> kube-systemcalico-node-gj5mb >> 250m (1%) 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> kube-proxy- 100m (0%) >> 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> prometheus-prometheus-node-exporter-9cntq 100m (0%) >> 200m (1%) 30Mi (0%)50Mi (0%) >> >> logging >> elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) >> 1 (6%) 8Gi (6%) 16Gi (13%) >> >> loggingf
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Hello, im running Spark 2.3 job on kubernetes cluster > > kubectl version > > Client Version: version.Info{Major:"1", Minor:"9", > GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", > GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", > GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} > > Server Version: version.Info{Major:"1", Minor:"8", > GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", > GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", > GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} > > > > when i ran spark submit on k8s master the driver pod is stuck in Waiting: > PodInitializing state. > I had to manually kill the driver pod and submit new job in this case > ,then it works.How this can be handled in production ? > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > This is happening if i submit the jobs almost parallel ie submit 5 jobs > one after the other simultaneously. > > I'm running spark jobs on 20 nodes each having below configuration > > I tried kubectl describe node on the node where trhe driver pod is running > this is what i got ,i do see there is overcommit on resources but i > expected kubernetes scheduler not to schedule if resources in node are > overcommitted or node is in Not Ready state ,in this case node is in Ready > State but i observe same behaviour if node is in "Not Ready" state > > > > Name: ** > > Roles: worker > > Labels: beta.kubernetes.io/arch=amd64 > > beta.kubernetes.io/os=linux > > kubernetes.io/hostname= > > node-role.kubernetes.io/worker=true > > Annotations:node.alpha.kubernetes.io/ttl=0 > > > volumes.kubernetes.io/controller-managed-attach-detach=true > > Taints: > > CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 > > Conditions: > > Type Status LastHeartbeatTime > LastTransitionTimeReason Message > > -- - > ---- --- > > OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has > sufficient disk space available > > MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has > sufficient memory available > > DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk > pressure > > ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 > Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting > ready status. AppArmor enabled > > Addresses: > > InternalIP: * > > Hostname:** > > Capacity: > > cpu: 16 > > memory: 125827288Ki > > pods:110 > > Allocatable: > > cpu: 16 > > memory: 125724888Ki > > pods:110 > > System Info: > > Machine ID: * > > System UUID:** > > Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f > > Kernel Version: 4.4.0-1062-aws > > OS Image: Ubuntu 16.04.4 LTS > > Operating System: linux > > Architecture: amd64 > > Container Runtime Version: docker://Unknown > > Kubelet Version:v1.8.3 > > Kube-Proxy Version: v1.8.3 > > PodCIDR: ** > > ExternalID: ** > > Non-terminated Pods: (11 in total) > > Namespace Name >CPU Requests CPU Limits Memory Requests Memory > Limits > > - > -- --- > - > > kube-systemcalico-node-gj5mb > 250m (1%) 0 (0%) 0 (0%) 0 (0%) > > kube-system > kube-proxy- 100m (0%) > 0 (0%) 0 (0%) 0 (0%) > > kube-systemprometheus-prometheus-node-exporter-9cntq > 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) > > logging > elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) > 1 (6%) 8Gi (6%) 16Gi (13%) > > loggingfluentd-fluentd-elasticsearch-tj7nd > 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) > > rook rook-agent-6jtzm >0 (0%)0 (0%) 0 (0%) 0 (0%) >
spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
im running Spark 2.3 job on kubernetes cluster kubectl version Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"8", GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} when i ran spark submit on k8s master the driver pod is stuck in Waiting: PodInitializing state. I had to manually kill the driver pod and submit new job in this case ,then it works. This is happening if i submit the jobs almost parallel ie submit 5 jobs one after the other simultaneously. I'm running spark jobs on 20 nodes each having below configuration I tried kubectl describe node on the node where trhe driver pod is running this is what i got ,i do see there is overcommit on resources but i expected kubernetes scheduler not to schedule if resources in node are overcommitted or node is in Not Ready state ,in this case node is in Ready State but i observe same behaviour if node is in "Not Ready" state Name: ** Roles: worker Labels: beta.kubernetes.io/arch=amd64 beta.kubernetes.io/os=linux kubernetes.io/hostname= node-role.kubernetes.io/worker=true Annotations:node.alpha.kubernetes.io/ttl=0 volumes.kubernetes.io/controller-managed-attach-detach=true Taints: CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 Conditions: Type Status LastHeartbeatTime LastTransitionTimeReason Message -- - ---- --- OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has sufficient disk space available MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has sufficient memory available DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk pressure ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting ready status. AppArmor enabled Addresses: InternalIP: * Hostname:** Capacity: cpu: 16 memory: 125827288Ki pods:110 Allocatable: cpu: 16 memory: 125724888Ki pods:110 System Info: Machine ID: * System UUID:** Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f Kernel Version: 4.4.0-1062-aws OS Image: Ubuntu 16.04.4 LTS Operating System: linux Architecture: amd64 Container Runtime Version: docker://Unknown Kubelet Version:v1.8.3 Kube-Proxy Version: v1.8.3 PodCIDR: ** ExternalID: ** Non-terminated Pods: (11 in total) Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits - -- --- - kube-systemcalico-node-gj5mb 250m (1%) 0 (0%) 0 (0%) 0 (0%) kube-system kube-proxy- 100m (0%) 0 (0%) 0 (0%) 0 (0%) kube-systemprometheus-prometheus-node-exporter-9cntq 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) logging elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) 1 (6%) 8Gi (6%) 16Gi (13%) loggingfluentd-fluentd-elasticsearch-tj7nd 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) rook rook-agent-6jtzm 0 (0%)0 (0%) 0 (0%) 0 (0%) rook rook-ceph-osd-10-6-42-250.accel.aws-cardda.cb4good.com-gwb8j0 (0%) 0 (0%) 0 (0%) 0 (0%) spark accelerate-test-5-a3bfb8a597e83d459193a183e17f13b5-exec-1 2 (12%) 0 (0%) 10Gi (8%)12Gi (10%) spark accelerate-testing-1-8ed0482f3bfb3c0a83da30bb7d433dff-exec-52 (12%) 0 (0%) 10Gi (8%)12Gi (1
Re: Executor lost for unknown reasons error Spark 2.3 on kubernetes
$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2018-07-30 19:58:42 INFO BlockManagerMasterEndpoint:54 - Trying to remove executor 7 from BlockManagerMaster. 2018-07-30 19:58:42 WARN BlockManagerMasterEndpoint:66 - No more replicas available for rdd_9_37 ! MasterEndpoint:54 - Removing block manager BlockManagerId(7, 10.*.*.*.*, 43888, None) 2018-07-30 19:58:42 INFO BlockManagerMaster:54 - Removed 7 successfully in removeExecutor 2018-07-30 19:58:42 INFO DAGScheduler:54 - Shuffle files lost for executor: 7 (epoch 1) 2018-07-30 19:58:42 ERROR ContextCleaner:91 - Error cleaning broadcast 11 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:155) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:321) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:238) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:194) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:185) at scala.Option.foreach(Option.scala:257) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:185) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.ContextCleaner.org <http://org.apache.spark.contextcleaner.org/> $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java: On Tue, Jul 31, 2018 at 8:32 AM purna pradeep wrote: > > Hello, >> >> >> >> I’m getting below error in spark driver pod logs and executor pods are >> getting killed midway through while the job is running and even driver pod >> Terminated with below intermittent error ,this happens if I run multiple >> jobs in parallel. >> >> >> >> Not able to see executor logs as executor pods are killed >> >> >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in >> stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure >> (executor 1 exited caused by one of the running tasks) Reason: Executor >> lost for unknown reasons. >> >> Driver stacktrace: >> >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) >> >> at >> org.apache.spark.scheduler.DAGScheduler$
Executor lost for unknown reasons error Spark 2.3 on kubernetes
> Hello, > > > > I’m getting below error in spark driver pod logs and executor pods are > getting killed midway through while the job is running and even driver pod > Terminated with below intermittent error ,this happens if I run multiple > jobs in parallel. > > > > Not able to see executor logs as executor pods are killed > > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 > in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage > 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 > exited caused by one of the running tasks) Reason: Executor lost for > unknown reasons. > > Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at scala.Option.foreach(Option.scala:257) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > > ... 42 mor >
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 mor
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 more Thanks, Purna The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark 2.3 Kubernetes error
> Hello, > > > > When I’m trying to set below options to spark-submit command on k8s Master > getting below error in spark-driver pod logs > > > > --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > > > But when I tried to set these extraJavaoptions as system.properties in the > spark application jar everything works fine. > > > > 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing > SparkContext. > > org.apache.spark.SparkException: External scheduler cannot be instantiated > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) > > at > org.apache.spark.SparkContext.(SparkContext.scala:492) > > at > org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) > > at scala.Option.getOrElse(Option.scala:121) > > at > org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) > > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: > Operation: [get] for kind: [Pod] with name: > [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] > failed. > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend. (KubernetesClusterSchedulerBackend.scala:70) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) > > ... 12 more > > Caused by: javax.net.ssl.SSLHandshakeException: > sun.security.validator.ValidatorException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > > at > sun.security.ssl.Alerts.getSSLException(Alerts.java:192) > > at > sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) > > at > sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) > > at > sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > > at > sun.security.ssl.Handshaker.process_record(Handshaker.java:961) > > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) > > at > sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) > > at > okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) > > at > okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) > > at > okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) > > at > okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) > > at > okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) > > at > okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) > > at > okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) > > at > okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) > > at > okhttp3.intern
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend. (KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(Br
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend. (KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.intern
Spark 2.3 driver pod stuck in Running state — Kubernetes
Hello, When I run spark-submit on k8s cluster I’m Seeing driver pod stuck in Running state and when I pulled driver pod logs I’m able to see below log I do understand that this warning might be because of lack of cpu/ Memory , but I expect driver pod be in “Pending” state rather than “ Running” state though actually it’s not Running So I had kill the driver pod and resubmit the job Please suggest here ! 2018-06-08 14:38:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:16 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:31 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:46 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:39:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
spark partitionBy with partitioned column in json output
im reading below json in spark {"bucket": "B01", "actionType": "A1", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B02", "actionType": "A2", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B03", "actionType": "A3", "preaction": "NULL", "postaction": "NULL"} val df=spark.read.json("actions.json").toDF() Now im writing the same to a json output as below df.write. format("json"). mode("append"). partitionBy("bucket","actionType"). save("output.json") and the output.json is as below {"preaction":"NULL","postaction":"NULL"} bucket,actionType columns are missing in the json output, i need partitionby columns as well in the output
Re: Spark 2.3 error on Kubernetes
Abirudh, Thanks for your response I’m running k8s cluster on AWS and kub-dns pods are running fine and also as I mentioned only 1 executor pod is running though I requested for 5 and rest 4 were killed with below error and I do have enough resources available. On Tue, May 29, 2018 at 6:28 PM Anirudh Ramanathan wrote: > This looks to me like a kube-dns error that's causing the driver DNS > address to not resolve. > It would be worth double checking that kube-dns is indeed running (in the > kube-system namespace). > Often, with environments like minikube, kube-dns may exit/crashloop due to > lack of resource. > > On Tue, May 29, 2018 at 3:18 PM, purna pradeep > wrote: > >> Hello, >> >> I’m getting below error when I spark-submit a Spark 2.3 app on >> Kubernetes *v1.8.3* , some of the executor pods were killed with below >> error as soon as they come up >> >> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) >> >> Caused by: org.apache.spark.SparkException: Exception thrown in >> awaitResult: >> >> at >> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) >> >> at >> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) >> >> at >> org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) >> >> at java.security.AccessController.doPrivileged(Native >> Method) >> >> at javax.security.auth.Subject.doAs(Subject.java:422) >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >> >> ... 4 more >> >> Caused by: java.io.IOException: Failed to connect to >> spark-1527629824987-driver-svc.spark.svc:7078 >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) >> >> at >> org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) >> >> at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.net.UnknownHostException: >> spark-1527629824987-driver-svc.spark.svc >> >> at >> java.net.InetAddress.getAllByName0(InetAddress.java:1280) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1192) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1126) >> >> at java.net.InetAddress.getByName(InetAddress.java:1076) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) >> >> at java.security.AccessController.doPrivileged(Native >&g
Spark 2.3 error on Kubernetes
Hello, I’m getting below error when I spark-submit a Spark 2.3 app on Kubernetes *v1.8.3* , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.ja
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.
Spark driver pod garbage collection
Hello, Currently I observe dead pods are not getting garbage collected (aka spark driver pods which have completed execution). So pods could sit in the namespace for weeks potentially. This makes listing, parsing, and reading pods slower and well as having junk sit on the cluster. I believe minimum-container-ttl-duration kubelet flag is by default set to 0 minute but I don’t see the completed spark driver pods are garbage collected Do I need to set any flag explicitly @ kubelet level?
Spark driver pod eviction Kubernetes
Hi, What would be the recommended approach to wait for spark driver pod to complete the currently running job before it gets evicted to new nodes while maintenance on the current node is goingon (kernel upgrade,hardware maintenance etc..) using drain command I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s) is taken by Kubernetes Please suggest !
Oozie with spark 2.3 in Kubernetes
Hello, Would like to know if anyone tried oozie with spark 2.3 actions on Kubernetes for scheduling spark jobs . Thanks, Purna
Re: Scala program to spark-submit on k8 cluster
yes “REST application that submits a Spark job to a k8s cluster by running spark-submit programmatically” and also would like to expose as a Kubernetes service so that clients can access as any other Rest api On Wed, Apr 4, 2018 at 12:25 PM Yinan Li wrote: > Hi Kittu, > > What do you mean by "a Scala program"? Do you mean a program that submits > a Spark job to a k8s cluster by running spark-submit programmatically, or > some example Scala application that is to run on the cluster? > > On Wed, Apr 4, 2018 at 4:45 AM, Kittu M wrote: > >> Hi, >> >> I’m looking for a Scala program to spark submit a Scala application >> (spark 2.3 job) on k8 cluster . >> >> Any help would be much appreciated. Thanks >> >> >> >
unsubscribe
unsubscribe
unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Thanks Yinan, Looks like this is stil in alpha version. Would like to know if there is any rest-interface for spark2.3 job submission similar to spark 2.2 as I need to submit spark applications to k8 master based on different events (cron or s3 file based trigger) On Tue, Mar 20, 2018 at 11:50 PM Yinan Li wrote: > One option is the Spark Operator > <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>. It allows > specifying and running Spark applications on Kubernetes using Kubernetes > custom resources objects. It takes SparkApplication CRD objects and > automatically submits the applications to run on a Kubernetes cluster. > > Yinan > > On Tue, Mar 20, 2018 at 7:47 PM, purna pradeep > wrote: > >> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 >> ,now i want to run spark-submit from AWS lambda function to k8s >> master,would like to know if there is any REST interface to run Spark >> submit on k8s Master > > >
Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 ,now i want to run spark-submit from AWS lambda function to k8s master,would like to know if there is any REST interface to run Spark submit on k8s Master
Re: Spark 2.3 submit on Kubernetes error
Thanks Yinan, I’m able to get kube-dns endpoints when I ran this command kubectl get ep kube-dns —namespace=kube-system Do I need to deploy under kube-system instead of default namespace And please lemme know if you have any insights on Error1 ? On Sun, Mar 11, 2018 at 8:26 PM Yinan Li wrote: > Spark on Kubernetes requires the presence of the kube-dns add-on properly > configured. The executors connect to the driver through a headless > Kubernetes service using the DNS name of the service. Can you check if you > have the add-on installed in your cluster? This issue > https://github.com/apache-spark-on-k8s/spark/issues/558 might help. > > > On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep > wrote: > >> Getting below errors when I’m trying to run spark-submit on k8 cluster >> >> >> *Error 1*:This looks like a warning it doesn’t interrupt the app running >> inside executor pod but keeps on getting this warning >> >> >> *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* >> *java.io.EOFException* >> * at >> okio.RealBufferedSource.require(RealBufferedSource.java:60)* >> * at >> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.readHeader(WebSocketReader.java:113)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.processNextFrame(WebSocketReader.java:97)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket.loopReader(RealWebSocket.java:262)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket$2.onResponse(RealWebSocket.java:201)* >> * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* >> * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* >> * at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* >> * at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* >> * at java.lang.Thread.run(Thread.java:748)* >> >> >> >> *Error2:* This is intermittent error which is failing the executor pod >> to run >> >> >> *org.apache.spark.SparkException: External scheduler cannot be >> instantiated* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* >> * at org.apache.spark.SparkContext.(SparkContext.scala:492)* >> * at >> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* >> * at scala.Option.getOrElse(Option.scala:121)* >> * at >> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* >> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: >> Operation: [get] for kind: [Pod] with name: >> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: >> [default] failed.* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)* >> * .
Spark 2.3 submit on Kubernetes error
Getting below errors when I’m trying to run spark-submit on k8 cluster *Error 1*:This looks like a warning it doesn’t interrupt the app running inside executor pod but keeps on getting this warning *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* *java.io.EOFException* * at okio.RealBufferedSource.require(RealBufferedSource.java:60)* * at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* * at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)* * at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)* * at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)* * at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)* * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* * at java.lang.Thread.run(Thread.java:748)* *Error2:* This is intermittent error which is failing the executor pod to run *org.apache.spark.SparkException: External scheduler cannot be instantiated* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* * at org.apache.spark.SparkContext.(SparkContext.scala:492)* * at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* * at scala.Option.getOrElse(Option.scala:121)* * at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* * at com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* * at com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* * at com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* * at com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* * at com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: [default] failed.* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)* * ... 11 more* *Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again* * at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)* * at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)* * at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)* * at java.net.InetAddress.getAllByName0(InetAddress.java:1276)* * at java.net.InetAddress.getAllByName(InetAddress.java:1192)* * at java.net.InetAddress.getAllByName(InetAddress.java:1126)* * at okhttp3.Dns$1.lookup(Dns.java:39)* * at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)* * at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)* * at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)* * at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)* * at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)* * at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)* * at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)* * at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* *
handling Remote dependencies for spark-submit in spark 2.3 with kubernetes
Im trying to run spark-submit to kubernetes cluster with spark 2.3 docker container image The challenge im facing is application have a mainapplication.jar and other dependency files & jars which are located in Remote location like AWS s3 ,but as per spark 2.3 documentation there is something called kubernetes init-container to download remote dependencies but in this case im not creating any Podspec to include init-containers in kubernetes, as per documentation Spark 2.3 spark/kubernetes internally creates Pods (driver,executor) So not sure how can i use init-container for spark-submit when there are remote dependencies. https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-remote-dependencies Please suggest
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unsubscribe
Unsubscribe
Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster
Hi all, Im performing spark submit using Spark rest api POST operation on 6066 port with below config > Launch Command: > "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java" > "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M" > "-Dspark.eventLog.enabled=true" > "-Dspark.app.name=WorkflowApp" > "-Dspark.submit.deployMode=cluster" > "-Dspark.local.dir=/data0,/data1,/data2,/data3" > "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077" > "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" > "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false" > "-Dspark.history.fs.logDirectory=s3a://<*>/" > "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256" > "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G" > "-Dspark.eventLog.dir=s3a://<*>/" > "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>" > "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>" when i looked into Spark eventlog below is what i observed {"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}} "spark.master":"local[*]" Though i ran in deployMode as cluster the slave ip is not shown in Host section & spark.master is shown as local[*] above ,because of this the job is running only on driver and therefore when job is submitted its not showing up in http://:8080 under Running and Completed applications and it shows only under Running Drivers & Completed Drivers. Please suggest The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark http: Not showing completed apps
Hi, I'm using spark standalone in aws ec2 .And I'm using spark rest API http::8080/Json to get completed apps but the Json completed apps as empty array though the job ran successfully.
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@Andres I need latest but it should less than 10 months based income_age column and don't want to use sql here On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi wrote: > Hi, if you need the last value from income in window function you can use > last_value. > No tested but meaby with @ayan sql > > spark.sql("select *, row_number(), last_value(income) over (partition by > id order by income_age_ts desc) r from t") > > > On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep > wrote: > >> @ayan, >> >> Thanks for your response >> >> I would like to have functions in this case calculateIncome and the >> reason why I need function is to reuse in other parts of the application >> ..that's the reason I'm planning for mapgroups with function as argument >> which takes rowiterator ..but not sure if this is the best to implement as >> my initial dataframe is very large >> >> On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: >> >>> Hi >>> >>> the tool you are looking for is window function. Example: >>> >>> >>> df.show() >>> +++---+--+-+ >>> |JoinDate|dept| id|income|income_age_ts| >>> +++---+--+-+ >>> | 4/20/13| ES|101| 19000| 4/20/17| >>> | 4/20/13| OS|101| 1| 10/3/15| >>> | 4/20/12| DS|102| 13000| 5/9/17| >>> | 4/20/12| CS|102| 12000| 5/8/17| >>> | 4/20/10| EQ|103| 1| 5/9/17| >>> | 4/20/10| MD|103| 9000| 5/8/17| >>> +++---+--+-+ >>> >>> >>> res = spark.sql("select *, row_number() over (partition by id order >>> by income_age_ts desc) r from t") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/13| OS|101| 1| 10/3/15| 2| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>> +++---+--+-+---+ >>> >>> >>> res = spark.sql("select * from (select *, row_number() over >>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>> >>> res.show() >>> +----++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +----+----+---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> +++---+--+-----+---+ >>> >>> This should be better because it uses all in-built optimizations in >>> Spark. >>> >>> Best >>> Ayan >>> >>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep >> > wrote: >>> >>>> Please click on unnamed text/html link for better view >>>> >>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep >>>> wrote: >>>> >>>>> >>>>> -- Forwarded message - >>>>> From: Mamillapalli, Purna Pradeep < >>>>> purnapradeep.mamillapa...@capitalone.com> >>>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>>> Subject: Spark question >>>>> To: purna pradeep >>>>> >>>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>>> >>>>> >>>>> >>>>> EmployeeID >>>>> >>>>> INCOME >>>>> >>>>> INCOME AGE TS >>>>> >>>>> JoinDate >>>>> >>>>> Dept >>>>> >>>>> 101 >>>>> >>>>> 19000 >>>>> >>>>> 4/20/17 >>>>> >>>>> 4/20/13 >>>>> >>>>> ES >>>>> >>>>> 101 >>>>> >>>>> 1 >>>>> >>>>> 10/3/15 >>>>> >>>>> 4/20/13 >>>>> >>>>> OS >>>>> >>>>> 102 >>>>> >>>>> 13000 >>>>> >>>>>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is the best to implement as my initial dataframe is very large On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: > Hi > > the tool you are looking for is window function. Example: > > >>> df.show() > +++---+--+-+ > |JoinDate|dept| id|income|income_age_ts| > +++---+--+-+ > | 4/20/13| ES|101| 19000| 4/20/17| > | 4/20/13| OS|101| 1| 10/3/15| > | 4/20/12| DS|102| 13000| 5/9/17| > | 4/20/12| CS|102| 12000| 5/8/17| > | 4/20/10| EQ|103| 1| 5/9/17| > | 4/20/10| MD|103| 9000| 5/8/17| > +++---+--+-+ > > >>> res = spark.sql("select *, row_number() over (partition by id order by > income_age_ts desc) r from t") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/10| MD|103| 9000| 5/8/17| 2| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/13| OS|101| 1| 10/3/15| 2| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > | 4/20/12| CS|102| 12000| 5/8/17| 2| > +++---+--+-+---+ > > >>> res = spark.sql("select * from (select *, row_number() over (partition > by id order by income_age_ts desc) r from t) x where r=1") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > +++---+--+-+---+ > > This should be better because it uses all in-built optimizations in Spark. > > Best > Ayan > > On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep > wrote: > >> Please click on unnamed text/html link for better view >> >> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep >> wrote: >> >>> >>> -- Forwarded message - >>> From: Mamillapalli, Purna Pradeep < >>> purnapradeep.mamillapa...@capitalone.com> >>> Date: Tue, Aug 29, 2017 at 8:08 PM >>> Subject: Spark question >>> To: purna pradeep >>> >>> Below is the input Dataframe(In real this is a very large Dataframe) >>> >>> >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 101 >>> >>> 1 >>> >>> 10/3/15 >>> >>> 4/20/13 >>> >>> OS >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 102 >>> >>> 12000 >>> >>> 5/8/17 >>> >>> 4/20/12 >>> >>> CS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> 103 >>> >>> 9000 >>> >>> 5/8/15 >>> >>> 4/20/10 >>> >>> MD >>> >>> Get the latest income of an employee which has Income_age ts <10 months >>> >>> Expected output Dataframe >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> >>> >> >> >> >> >> &
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeep wrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM > Subject: Spark question > To: purna pradeep > > Below is the input Dataframe(In real this is a very large Dataframe) > > > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 101 > > 1 > > 10/3/15 > > 4/20/13 > > OS > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 102 > > 12000 > > 5/8/17 > > 4/20/12 > > CS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > 103 > > 9000 > > 5/8/15 > > 4/20/10 > > MD > > Get the latest income of an employee which has Income_age ts <10 months > > Expected output Dataframe > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > > Below is what im planning to implement > > > > case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, > *JOINDATE*: Int,DEPT:String) > > > > *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( > *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, > *"Date"*). add(*"DEPT"*,*"String"*) > > > > *//Reading from the File **import *sparkSession.implicits._ > > *val *readEmpFile = sparkSession.read > .option(*"sep"*, *","*) > .schema(empSchema) > .csv(INPUT_DIRECTORY) > > > *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] > > > *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* > EmployeeID*) > > > *val *k = groupByDf.mapGroups((key,value) => performETL(value)) > > > > > > *def *performETL(empData: Iterator[employee]) : new employee = { > > *val *empList = empData.toList > > > *//calculate income has Logic to figureout latest income for an account > and returns latest income val *income = calculateIncome(empList) > > > *for *(i <- empList) { > > *val *row = i > > *return new *employee(row.EmployeeID, row.INCOMEAGE , income) > } > *return "Done"* > > > > } > > > > Is this a better approach or even the right approach to implement the > same.If not please suggest a better way to implement the same? > > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Re: use WithColumn with external function in a java jar
Thanks, I'll check it out. On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam wrote: > You can create a UDF which will invoke your java lib > > def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: > String) => new MyJava().calculateExpense(pexpense.toDouble, > cexpense.toDouble)) > > > > > > On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep > wrote: > >> I have data in a DataFrame with below columns >> >> 1)Fileformat is csv >> 2)All below column datatypes are String >> >> employeeid,pexpense,cexpense >> >> Now I need to create a new DataFrame which has new column called >> `expense`, which is calculated based on columns `pexpense`, `cexpense`. >> >> The tricky part is the calculation algorithm is not an **UDF** function >> which I created, but it's an external function that needs to be imported >> from a Java library which takes primitive types as arguments - in this case >> `pexpense`, `cexpense` - to calculate the value required for new column. >> >> The external function signature >> >> public class MyJava >> >> { >> >> public Double calculateExpense(Double pexpense, Double cexpense) { >>// calculation >> } >> >> } >> >> So how can I invoke that external function to create a new calculated >> column. Can I register that external function as UDF in my Spark >> application? >> >> Stackoverflow reference >> >> >> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function >> >> >> >> >> >> >
Select entire row based on a logic applied on 2 columns across multiple rows
-- Forwarded message - From: Mamillapalli, Purna Pradeep Date: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep Below is the input Dataframe(In real this is a very large Dataframe) EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 101 1 10/3/15 4/20/13 OS 102 13000 5/9/17 4/20/12 DS 102 12000 5/8/17 4/20/12 CS 103 1 5/9/17 4/20/10 EQ 103 9000 5/8/15 4/20/10 MD Get the latest income of an employee which has Income_age ts <10 months Expected output Dataframe EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 102 13000 5/9/17 4/20/12 DS 103 1 5/9/17 4/20/10 EQ Below is what im planning to implement case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, *JOINDATE*: Int,DEPT:String) *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*) *//Reading from the File **import *sparkSession.implicits._ *val *readEmpFile = sparkSession.read .option(*"sep"*, *","*) .schema(empSchema) .csv(INPUT_DIRECTORY) *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* EmployeeID*) *val *k = groupByDf.mapGroups((key,value) => performETL(value)) *def *performETL(empData: Iterator[employee]) : new employee = { *val *empList = empData.toList *//calculate income has Logic to figureout latest income for an account and returns latest income val *income = calculateIncome(empList) *for *(i <- empList) { *val *row = i *return new *employee(row.EmployeeID, row.INCOMEAGE , income) } *return "Done"* } Is this a better approach or even the right approach to implement the same.If not please suggest a better way to implement the same? -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
use WithColumn with external function in a java jar
I have data in a DataFrame with below columns 1)Fileformat is csv 2)All below column datatypes are String employeeid,pexpense,cexpense Now I need to create a new DataFrame which has new column called `expense`, which is calculated based on columns `pexpense`, `cexpense`. The tricky part is the calculation algorithm is not an **UDF** function which I created, but it's an external function that needs to be imported from a Java library which takes primitive types as arguments - in this case `pexpense`, `cexpense` - to calculate the value required for new column. The external function signature public class MyJava { public Double calculateExpense(Double pexpense, Double cexpense) { // calculation } } So how can I invoke that external function to create a new calculated column. Can I register that external function as UDF in my Spark application? Stackoverflow reference https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
Re: Restart streaming query spark 2.1 structured streaming
And also is query.stop() is graceful stop operation?what happens to already received data will it be processed ? On Tue, Aug 15, 2017 at 7:21 PM purna pradeep wrote: > Ok thanks > > Few more > > 1.when I looked into the documentation it says onQueryprogress is not > threadsafe ,So Is this method would be the right place to refresh cache?and > no need to restart query if I choose listener ? > > The methods are not thread-safe as they may be called from different > threads. > > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala > > > > 2.if I use streamingquerylistner onqueryprogress my understanding is > method will be executed only when the query is in progress so if I refresh > data frame here without restarting query will it impact application ? > > 3.should I use unpersist (Boolean) blocking method or async method > unpersist() as the data size is big. > > I feel your solution is better as it stops query --> refresh cache --> > starts query if I compromise on little downtime even cached dataframe is > huge .I'm not sure how listener behaves as it's asynchronous, correct me if > I'm wrong. > > On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das > wrote: > >> Both works. The asynchronous method with listener will have less of down >> time, just that the first trigger/batch after the asynchronous >> unpersist+persist will probably take longer as it has to reload the data. >> >> >> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep >> wrote: >> >>> Thanks tathagata das actually I'm planning to something like this >>> >>> activeQuery.stop() >>> >>> //unpersist and persist cached data frame >>> >>> df.unpersist() >>> >>> //read the updated data //data size of df is around 100gb >>> >>> df.persist() >>> >>> activeQuery = startQuery() >>> >>> >>> the cached data frame size around 100gb ,so the question is this the >>> right place to refresh this huge cached data frame ? >>> >>> I'm also trying to refresh cached data frame in onqueryprogress() method >>> in a class which extends StreamingQuerylistner >>> >>> Would like to know which is the best place to refresh cached data frame >>> and why >>> >>> Thanks again for the below response >>> >>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> You can do something like this. >>>> >>>> >>>> def startQuery(): StreamingQuery = { >>>>// create your streaming dataframes >>>>// start the query with the same checkpoint directory} >>>> >>>> // handle to the active queryvar activeQuery: StreamingQuery = null >>>> while(!stopped) { >>>> >>>>if (activeQuery = null) { // if query not active, start query >>>> activeQuery = startQuery() >>>> >>>>} else if (shouldRestartQuery()) { // check your condition and >>>> restart query >>>> activeQuery.stop() >>>> activeQuery = startQuery() >>>>} >>>> >>>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>>// if there is any error it will throw exception and quit the loop >>>>// otherwise it will keep checking the condition every 100ms} >>>> >>>> >>>> >>>> >>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >>> > wrote: >>>> >>>>> Thanks Michael >>>>> >>>>> I guess my question is little confusing ..let me try again >>>>> >>>>> >>>>> I would like to restart streaming query programmatically while my >>>>> streaming application is running based on a condition and why I want to do >>>>> this >>>>> >>>>> I want to refresh a cached data frame based on a condition and the >>>>> best way to do this restart streaming query suggested by Tdas below for >>>>> similar problem >>>>> >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>>> >>>>> I do understand that checkpoint if helps in recovery and failures but >>>>> I
Re: Restart streaming query spark 2.1 structured streaming
Ok thanks Few more 1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>>// create your streaming dataframes >>>// start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>>if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>>} else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>>} >>> >>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>// if there is any error it will throw exception and quit the loop >>>// otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >>> wrote: >>> >>>> Thanks Michael >>>> >>>> I guess my question is little confusing ..let me try again >>>> >>>> >>>> I would like to restart streaming query programmatically while my >>>> streaming application is running based on a condition and why I want to do >>>> this >>>> >>>> I want to refresh a cached data frame based on a condition and the best >>>> way to do this restart streaming query suggested by Tdas below for similar >>>> problem >>>> >>>> >>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>> >>>> I do understand that checkpoint if helps in recovery and failures but I >>>> would like to know "how to restart streaming query programmatically without >>>> stopping my streaming application" >>>> >>>> In place of query.awaittermination should I need to have an logic to >>>> restart query? Please suggest >>>> >>>> >>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> See >>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpoin
Re: Restart streaming query spark 2.1 structured streaming
Thanks tathagata das actually I'm planning to something like this activeQuery.stop() //unpersist and persist cached data frame df.unpersist() //read the updated data //data size of df is around 100gb df.persist() activeQuery = startQuery() the cached data frame size around 100gb ,so the question is this the right place to refresh this huge cached data frame ? I'm also trying to refresh cached data frame in onqueryprogress() method in a class which extends StreamingQuerylistner Would like to know which is the best place to refresh cached data frame and why Thanks again for the below response On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das wrote: > You can do something like this. > > > def startQuery(): StreamingQuery = { >// create your streaming dataframes >// start the query with the same checkpoint directory} > > // handle to the active queryvar activeQuery: StreamingQuery = null > while(!stopped) { > >if (activeQuery = null) { // if query not active, start query > activeQuery = startQuery() > >} else if (shouldRestartQuery()) { // check your condition and > restart query > activeQuery.stop() > activeQuery = startQuery() >} > >activeQuery.awaitTermination(100) // wait for 100 ms. >// if there is any error it will throw exception and quit the loop >// otherwise it will keep checking the condition every 100ms} > > > > > On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep > wrote: > >> Thanks Michael >> >> I guess my question is little confusing ..let me try again >> >> >> I would like to restart streaming query programmatically while my >> streaming application is running based on a condition and why I want to do >> this >> >> I want to refresh a cached data frame based on a condition and the best >> way to do this restart streaming query suggested by Tdas below for similar >> problem >> >> >> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >> >> I do understand that checkpoint if helps in recovery and failures but I >> would like to know "how to restart streaming query programmatically without >> stopping my streaming application" >> >> In place of query.awaittermination should I need to have an logic to >> restart query? Please suggest >> >> >> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust >> wrote: >> >>> See >>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>> >>> Though I think that this currently doesn't work with the console sink. >>> >>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >>> wrote: >>> >>>> Hi, >>>> >>>>> >>>>> I'm trying to restart a streaming query to refresh cached data frame >>>>> >>>>> Where and how should I restart streaming query >>>>> >>>> >>>> >>>> val sparkSes = SparkSession >>>> >>>> .builder >>>> >>>> .config("spark.master", "local") >>>> >>>> .appName("StreamingCahcePoc") >>>> >>>> .getOrCreate() >>>> >>>> >>>> >>>> import sparkSes.implicits._ >>>> >>>> >>>> >>>> val dataDF = sparkSes.readStream >>>> >>>> .schema(streamSchema) >>>> >>>> .csv("testData") >>>> >>>> >>>> >>>> >>>> >>>>val query = counts.writeStream >>>> >>>> .outputMode("complete") >>>> >>>> .format("console") >>>> >>>> .start() >>>> >>>> >>>> query.awaittermination() >>>> >>>> >>>> >>>>> >>>>> >>>>> >>> >
Re: Restart streaming query spark 2.1 structured streaming
Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep > wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMode("complete") >> >> .format("console") >> >> .start() >> >> >> query.awaittermination() >> >> >> >>> >>> >>> >
Restart streaming query spark 2.1 structured streaming
Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > >
StreamingQueryListner spark structered Streaming
Im working on structered streaming application wherein im reading from Kafka as stream and for each batch of streams i need to perform S3 lookup file (which is nearly 200gb) to fetch some attributes .So im using df.persist() (basically caching the lookup) but i need to refresh the dataframe as the S3 lookup data changes frequently.im using below code class RefreshcachedDF(sparkSession: SparkSession) extends StreamingQueryListener { override def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val currTime = System.currentTimeMillis() if (currTime > (latestrefreshtime mentioned in a globaltempview)) { //oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB. oldDF.unpersist() //I guess this is async call ,should i use unpersist(true) which is blocking?and is it safe ? val inputDf: DataFrame = readFile(spec, sparkSession) val recreateddf = inputDf.persist() val count = recreateddf.count() } } } } Is the above approach is a better solution to refresh cached dataframe? and the trigger for this refresh is will store the expirydate of cache for S3 in a globaltempview . Note:S3 is one lookup source but i do have other sources which has data size of 20 to 30 GB - So the question is this the right place to refresh the cached df ? - if yes should i use blocking or non-blocking unpersist method as the data is huge 15GB? - For similar issue i see below response from Tdas with subject as Re: Refreshing a persisted RDD "Yes, you will have to recreate the streaming Dataframe along with the static Dataframe, and restart the query. There isnt a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + spark cluster, is reasonably fast. If your applicatoin can tolerate 10 second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution." [http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser] [1]: http://SparkMailingList So if thats better solution should i restart query as below query.processAllavaialble() query.stop() df.unpersist() val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3 or anyother source val recreateddf = inputDf.persist() query.start() when i looked into spark documentation of above methods void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition). stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable() Please suggest a better approach to refresh the cache.