Re: Profiling in YourKit
1 You have 4 CPU core and 34 threads (system wide, you likely have many more, by the way). Think of it as having 4 espresso machine and 34 baristas. Does the fact that you have only 4 espresso machine mean you can only have 4 baristas? Of course not, there's plenty more work other than making espresso, like foaming the milk, talking to customers (IO) etc. They just have to use the espresso machine in turn, which is managed by the OS. 2 Imagine you are making 100 cups and 10K cups of coffee, respectively. If you have 4 espresso machine, what's the most sensible thing to do? Probably just using 4 machines in both cases. ᐧ On Sat, Feb 7, 2015 at 10:14 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am using YourKit tool to profile Spark jobs that is run in my Single Node Spark Cluster. When I see the YourKit UI Performance Charts, the thread count always remains at All threads: 34 Daemon threads: 32 Here are my questions: 1. My system can run only 4 threads simultaneously, and obviously my system does not have 34 threads. What could 34 threads mean? 2. I tried running the same job with four different datasets, two small and two relatively big. But in the UI the thread count increases by two, irrespective of data size. Does this mean that the number of threads allocated to each job depending on data size is not taken care by the framework? Thank You
Re: Profiling in YourKit
If you look at the threads, the other 30 are almost surely not Spark worker threads. They're the JVM finalizer, GC threads, Jetty listeners, etc. Nothing wrong with this. Your OS has hundreds of threads running now, most of which are idle, and up to 4 of which can be executing. In a one-machine cluster, I don't think you would expect any difference in number of running threads. More data does not mean more threads, no. Your executor probably takes as many threads as cores in both cases, 4. On Sat, Feb 7, 2015 at 10:14 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am using YourKit tool to profile Spark jobs that is run in my Single Node Spark Cluster. When I see the YourKit UI Performance Charts, the thread count always remains at All threads: 34 Daemon threads: 32 Here are my questions: 1. My system can run only 4 threads simultaneously, and obviously my system does not have 34 threads. What could 34 threads mean? 2. I tried running the same job with four different datasets, two small and two relatively big. But in the UI the thread count increases by two, irrespective of data size. Does this mean that the number of threads allocated to each job depending on data size is not taken care by the framework? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is the pre-built version of spark 1.2.0 with --hive option?
https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L217 Yes, except the 'without hive' version. On Sat, Feb 7, 2015 at 3:45 PM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, After various problems with the binaries built by myself, I want to try the pre-built binary, but I want to know whether it is built with --hive option. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
getting error when submit spark with master as yarn
Hi, when I am trying to execute my program as spark-submit --master yarn --class com.mytestpack.analysis.SparkTest sparktest-1.jar I am getting error bellow error- java.lang.IllegalArgumentException: Required executor memory (1024+384 MB) is above the max threshold (1024 MB) of this cluster! at org.apache.spark.deploy.yarn.ClientBase$class.verifyClusterResources(ClientBase.scala:71) at org.apache.spark.deploy.yarn.Client.verifyClusterResources(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:77) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) I am new in Hadoop environment, Please help how/where need to set memory or any configuration ,thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: getting error when submit spark with master as yarn
Hi Sachin, In your YARN configuration, either yarn.nodemanager.resource.memory-mb is 1024 on your nodes or yarn.scheduler.maximum-allocation-mb is set to 1024. If you have more than 1024 MB on each node, you should bump these properties. Otherwise, you should request fewer resources by setting --executor-memory and --driver-memory when you launch your Spark job. -Sandy On Sat, Feb 7, 2015 at 10:04 AM, sachin Singh sachin.sha...@gmail.com wrote: Hi, when I am trying to execute my program as spark-submit --master yarn --class com.mytestpack.analysis.SparkTest sparktest-1.jar I am getting error bellow error- java.lang.IllegalArgumentException: Required executor memory (1024+384 MB) is above the max threshold (1024 MB) of this cluster! at org.apache.spark.deploy.yarn.ClientBase$class.verifyClusterResources(ClientBase.scala:71) at org.apache.spark.deploy.yarn.Client.verifyClusterResources(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:77) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) I am new in Hadoop environment, Please help how/where need to set memory or any configuration ,thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark impersonation
https://issues.apache.org/jira/browse/SPARK-5493 currently tracks this. -Sandy On Mon, Feb 2, 2015 at 9:37 PM, Zhan Zhang zzh...@hortonworks.com wrote: I think you can configure hadoop/hive to do impersonation. There is no difference between secure or insecure hadoop cluster by using kinit. Thanks. Zhan Zhang On Feb 2, 2015, at 9:32 PM, Koert Kuipers ko...@tresata.com wrote: yes jobs run as the user that launched them. if you want to run jobs on a secure cluster then use yarn. hadoop standalone does not support secure hadoop. On Mon, Feb 2, 2015 at 5:37 PM, Jim Green openkbi...@gmail.com wrote: Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Can't access remote Hive table from spark
Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2555) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2524) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:823) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:823) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:816) at
Similar code in Java
Hi Guys, How could I doing in Java the code scala below? val KafkaDStreams = (1 to numStreams) map {_ = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,storageLevel = StorageLevel.MEMORY_ONLY).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) val sparkProcessingParallelism = 1 unifiedStream.repartition(sparkProcessingParallelism) Thanks Guys -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Can't access remote Hive table from spark
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x Looks like permission issue. Can you give access to 'xiaobogu' ? Cheers On Sat, Feb 7, 2015 at 8:15 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2555) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2524) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:823) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:823) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:816) at
Re: Can't access remote Hive table from spark
Yes. You need to create xiaobogu under /user and provide right permission to xiaobogu. Thanks. Zhan Zhang On Feb 7, 2015, at 8:15 AM, guxiaobo1982 guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2555) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2524) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:823) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:823) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:816) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1815) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:595) at
Re: Profiling in YourKit
So, Can I increase the number of threads by manually coding in the Spark code? On Sat, Feb 7, 2015 at 6:52 PM, Sean Owen so...@cloudera.com wrote: If you look at the threads, the other 30 are almost surely not Spark worker threads. They're the JVM finalizer, GC threads, Jetty listeners, etc. Nothing wrong with this. Your OS has hundreds of threads running now, most of which are idle, and up to 4 of which can be executing. In a one-machine cluster, I don't think you would expect any difference in number of running threads. More data does not mean more threads, no. Your executor probably takes as many threads as cores in both cases, 4. On Sat, Feb 7, 2015 at 10:14 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am using YourKit tool to profile Spark jobs that is run in my Single Node Spark Cluster. When I see the YourKit UI Performance Charts, the thread count always remains at All threads: 34 Daemon threads: 32 Here are my questions: 1. My system can run only 4 threads simultaneously, and obviously my system does not have 34 threads. What could 34 threads mean? 2. I tried running the same job with four different datasets, two small and two relatively big. But in the UI the thread count increases by two, irrespective of data size. Does this mean that the number of threads allocated to each job depending on data size is not taken care by the framework? Thank You
no space left at worker node
Hi, I submitted a spark job to an ec2 cluster, using spark-submit. At a worker node, there is an exception of 'no space left on device' as follows. == 15/02/08 01:53:38 ERROR logging.FileAppender: Error writing stream to file /root/spark/work/app-20150208014557-0003/0/stdout java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at org.apache.spark.util.logging.FileAppender.appendToFile(FileAppender.scala:92) at org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:72) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) === The command df showed the following information at the worker node: Filesystem 1K-blocks Used Available Use% Mounted on /dev/xvda1 8256920 8256456 0 100% / tmpfs 7752012 0 7752012 0% /dev/shm /dev/xvdb 30963708 1729652 27661192 6% /mnt Does anybody know how to fix this? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/no-space-left-at-worker-node-tp21545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
When uses SparkFiles.get(GeoIP.dat), got exception in thread main java.io.FileNotFoundException
Hi there, Spark version: 1.2 /home/hadoop/spark/bin/spark-submit --class com.litb.bi.CSLog2ES --master yarn --executor-memory 1G --jars /mnt/external/kafka/target/spark-streaming-kafka_2.10-1.2.0.jar,/mnt/external/kafka/target/zkclient-0.3.jar,/mnt/external/kafka/target/metrics-core-2.2.0.jar,/mnt/external/kafka/target/kafka_2.10-0.8.0.jar,elasticsearch-hadoop-2.1.0.Beta3.jar,geoip-api-1.2.13.jar --files /mnt/GeoIP.dat BILog-1.1-SNAPSHOT.jar 54.175.174.144 test test_ctrlitb 2 In my code, I want to use the GeoIP.dat to parse the IP of clickstream log. val Array(zkQuorum, group, topics, numThreads) = args val conf = new SparkConf().setAppName(Kafka CTRLog to ES) conf.set(spark.streaming.receiver.writeAheadLogs.enable, true) conf.set(es.index.auto.create, true) conf.set(es.nodes, 10.5.2.250) // conf.set(spark.serializer, classOf[KryoSerializer].getName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val topicMap = topics.split(,).map((_, numThreads.toInt)).toMap val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) // geoip file on executor val geofile_path = SparkFiles.get(GeoIP.dat) val cl = new LookupService(geofile_path, LookupService.GEOIP_MEMORY_CACHE | LookupService.GEOIP_CHECK_CACHE) I got the the following execption: 2015-02-08 06:51:17,064 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming,null} 2015-02-08 06:51:17,065 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming/json,null} Exception in thread main java.io.FileNotFoundException: /tmp/spark-d85f0f21-2e66-4ed7-ae31-47564c8dfefd/GeoIP.dat (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:241) at com.maxmind.geoip.LookupService.init(LookupService.java:282) at com.maxmind.geoip.LookupService.init(LookupService.java:264) at com.litb.bi.CSLog2ES$.main(CSLog2ES.scala:51) at com.litb.bi.CSLog2ES.main(CSLog2ES.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Shen Zhun Data Mining at LightnInTheBox.com Email:shenzhunal...@gmail.com
ERROR EndpointWriter: AssociationError
Hello, I'm new to Spark, and tried to setup a Spark cluster of 1 master VM SparkV1 and 1 worker VM SparkV4 (the error is the same if I have 2 workers). They are connected without a problem now. But when I submit a job (as in https://spark.apache.org/docs/latest/quick-start.html) at the master: spark-submit --master spark://SparkV1:7077 examples/src/main/python/pi.py it seems to run ok and returns Pi is roughly..., but the worker has the following Error: 15/02/07 15:22:33 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@SparkV4:47986] - [akka.tcp://sparkExecutor@SparkV4:46630]: Error [Shut down address: akka.tcp://sparkExecutor@SparkV4:46630] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@SparkV4:46630 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] More about the setup: each VM has only 4GB RAM, running Ubuntu, using spark-1.2.0, built for Hadoop 2.6.0. I have struggled with this error for a few days. Could anyone please tell me what the problem is and how to fix it? Thanks, Lan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-EndpointWriter-AssociationError-tp21543.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark impersonation
Sorry for the many typos as I was typing from my cell phone. Hope you still can get the idea. On Sat, Feb 7, 2015 at 1:55 PM, Chester @work ches...@alpinenow.com wrote: I just implemented this in our application. The impersonation is done before the job is submitted. In spark yarn (we are using yarn cluster mode) , it just takes the current User from UserGroupInfoemation and summitted to yarn resource manager. If one use Kinit from command line, the who Jvm needs to has the same principal and you have to handle ticket expiration with cron job. If this is individual cli at hoc job, this might be ok. But if you intended to use an application to run spark job and end user interact with spark, then you need set up a service super user use that user to login to Kerbros KDC (Kinit equivalent) programmally, then create proxy user to impersonate end user. You can handle ticket expiration in code as well. So there is no need of cron job Certainly one can move all these logic to spark, one need to create spark service user principal and keytab. As part of the spark job submit , one can pass the principal and keytab location to the spark and spark can create a proxy user if the authentication is Kerberos, as well as add job delegation tokens I will love to contribute this if we need this in spark , as I just completed the Hadoop Kerberos authentication feature, It covers pig, map reduce , spark, sqoops as well as standard HDFS access. I will take a look at sandy's jira Chester On Feb 2, 2015, at 2:37 PM, Jim Green openkbi...@gmail.com wrote: Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Custom streaming receiver slow on YARN
Hello people, I have an issue that my streaming receiver is laggy on YARN. Can anyone reply to my question on StackOverflow?: http://stackoverflow.com/questions/28370362/spark-streaming-receiver-particularly-slow-on-yarn Thanks Jong Wook -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-streaming-receiver-slow-on-YARN-tp21544.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[GraphX] Excessive value recalculations during aggregateMessages cycles
I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle