Re: Processing multiple request in cluster
You can try spark on Mesos or Yarn since they have lot more support for scheduling and all Thanks Best Regards On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote: hi All, How to run concurrently multiple requests on same cluster. I have a program using *spark streaming context *which reads* streaming data* and writes it to HBase. It works fine, the problem is when multiple requests are submitted to cluster, only first request is processed as the entire cluster is used for this request. Rest of the requests are in waiting mode. i have set spark.cores.max to 2 or less, so that it can process another request,but if there is only one request cluster is not utilized properly. Is there any way, that spark cluster can process streaming request concurrently at the same time effectively utitlizing cluster, something like sharkserver Thanks Subacini
Re: What is a pre built package of Apache Spark
Looks like pyspark was not able to find the python binaries from the environment. You need to install python https://docs.python.org/2/faq/windows.html (if not installed already). Thanks Best Regards On Thu, Sep 25, 2014 at 9:00 AM, Denny Lee denny.g@gmail.com wrote: This seems similar to a related Windows issue concerning python where pyspark could't find the python because the PYTHONSTARTUP environment wasn't set - by any chance could this be related? On Wed, Sep 24, 2014 at 7:51 PM, christy 760948...@qq.com wrote: Hi I have installed standalone on win7(src not pre-built version), after the sbt assembly, I can run spark-shell successfully but python shell does work. $ ./bin/pyspark ./bin/pyspark: line 111: exec: python: not found have you solved your problem? Thanks, Christy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-pre-built-package-of-Apache-Spark-tp14080p15101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Log hdfs blocks sending
Hello again spark users and developers! I have standalone spark cluster (1.1.0) and spark sql running on it. My cluster consists of 4 datanodes and replication factor of files is 3. I use thrift server to access spark sql and have 1 table with 30+ partitions. When I run query on whole table (something simple like select count(*) from t) spark produces a lot of network activity filling all available 1gb link. Looks like spark sent data by network instead of local reading. Is it any way to log which blocks were accessed locally and which are not? Thanks!
Re: quick start guide: building a standalone scala program
I have encountered the same issue when I went through the tutorial first standalone application. Then I tried to reinstall the stb but it doest help. Then I follow this thread, create a workspace under spark directly and execute ./sbt/sbt package, it says packing successfully. But how this happen? How the sbt know which location specific? And though it went smoothly, I didn't see any jar had been created. Pls help. Thanks, Christy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError while running SVD MLLib example
7000x7000 is not tall-and-skinny matrix. Storing the dense matrix requires 784MB. The driver needs more storage for collecting result from executors as well as making a copy for LAPACK's dgesvd. So you need more memory. Do you need the full SVD? If not, try to use a small k, e.g, 50. -Xiangrui On Wed, Sep 24, 2014 at 3:01 PM, Shailesh Birari sbir...@wynyardgroup.com wrote: Note, the data is random numbers (double). Any suggestions/pointers will be highly appreciated. Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-while-running-SVD-MLLib-example-tp14972p15083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Out of memory exception in MLlib's naive baye's classification training
For the vectorizer, what's the output feature dimension and are you creating sparse vectors or dense vectors? The model on the driver consists of numClasses * numFeatures doubles. However, the driver needs more memory in order to receive the task result (of the same size) from executors. So you need to control the feature dimension (this is why people use the hashing trick) and reduce the number of partitions. -Xiangrui On Wed, Sep 24, 2014 at 10:59 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I was able to get the training running in local mode with default settings, there was a problem with document labels which were quite large(not 20 as suggested earlier). I am currently training 175000 documents on a single node with 2GB of executor memory and 5GB of driver memory successfully. If I increase the number of documents, I get the OOM error. I wish to understand what generally the bottlenecks are for naive bayes, is it the executor or the driver memory? Also, what are the things to keep in mind while training huge sets of data so that I can have a bullet proof classification system, slowing down in case of low memory is fine but not exceptions. As a side note, is there any classification algorithm in MLlib which can just append the new training data to an existing model? With naive bayes, I need to have all the data available at once for training. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-exception-in-MLlib-s-naive-baye-s-classification-training-tp14809p15052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: YARN ResourceManager and Hadoop NameNode Web UI not visible in port 8088, port 50070
Hi Raghuveer, This might be a better question for the cdh-user list or the Hadoop user list. The Hadoop web interfaces for both the NameNode and ResourceManager are enabled by default. Is it possible you have a firewall blocking those ports? -Sandy On Wed, Sep 24, 2014 at 9:00 PM, Raghuveer Chanda raghuveer.cha...@gmail.com wrote: Hi, Im running a spark job in YARN cluster ..but im not able to see the Web Interface of the YARN ResourceManager and Hadoop NameNode Web UI in port 8088, port 50070 and spark stages. Only the spark UI in port 18080 is visible. I got the URL's from cloudera but may be due to some default option for security the Web Interface is disabled. How can i enable the web interface i.e is there any option in cloudera or is the server firewall is blocking it .. Please help .. -- Regards, Raghuveer Chanda 4th year Undergraduate Student Computer Science and Engineering IIT Kharagpur
Re: quick start guide: building a standalone scala program
I encountered exactly the same problem. How did you solve this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue with Spark-1.1.0 and the start-thriftserver.sh script
Hi We've just experienced an issue with the new Spark-1.1.0 and the start-thriftserver.sh script. We tried to launch start-thriftserver.sh with --master yarn option and got the following error message : Failed to load Hive Thrift server main class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2. You need to build Spark with -Phive. In fact Spark was built with -Phive option, but the real problem was this one : Application appattempt_1411058337040_0118_01 submitted by user x to unknown queue: default So the solution was to specified the queue, and it works : /opt/spark/sbin/start-thriftserver.sh --master yarn --queue spark-batch Hope this could help, as the error message is not really clear (and rather wrong). Helene -- Hélène Delanoeye Software engineer / Search team E helene.delano...@kelkoo.comY!Messenger kelkoohelened T (33) 4 56 09 07 57 A 6, rue des Méridiens 38130 Echirolles FRANCE Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Re: Processing multiple request in cluster
There are two problems you may be facing. 1. your application is taking all resources 2. inside your application task submission is not scheduling properly. for 1 you can either configure your app to take less resources or use mesos/yarn types scheduler to dynamically change or juggle resources for 2. you can use fair scheduler so that application tasks can be scheduled more fairly. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Sep 25, 2014 at 12:32 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try spark on Mesos or Yarn since they have lot more support for scheduling and all Thanks Best Regards On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote: hi All, How to run concurrently multiple requests on same cluster. I have a program using *spark streaming context *which reads* streaming data* and writes it to HBase. It works fine, the problem is when multiple requests are submitted to cluster, only first request is processed as the entire cluster is used for this request. Rest of the requests are in waiting mode. i have set spark.cores.max to 2 or less, so that it can process another request,but if there is only one request cluster is not utilized properly. Is there any way, that spark cluster can process streaming request concurrently at the same time effectively utitlizing cluster, something like sharkserver Thanks Subacini
Re: Can not see any spark metrics on ganglia-web
Hi, I found the problem. By default, gmond is monitoring the multicast ip:239.2.11.71, while I set *.sink.ganglia.host=localhost. the correct configuration in metrics.properties: # Enable GangliaSink for all instances *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink #*.sink.ganglia.host=localhost *.sink.ganglia.host=239.2.11.71 *.sink.ganglia.port=8653 *.sink.ganglia.period=10 *.sink.ganglia.unit=seconds *.sink.ganglia.ttl=1 *.sink.ganglia.mode=multicast -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p15128.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memory used in Spark-0.9.0-incubating
ENV: Spark:0.9.0-incubating Hadoop:2.3.0 I run spark task on Yarn. I see the log in Nodemanager: 2014-09-25 17:43:34,141 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:37,171 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:40,210 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:43,239 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:46,269 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used My task parameter is : --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4 In myopinion --worker-memory 4g 4g is the maximum memory for container . But why 4.5 GB of 5 GB physical memory used in the log? And where to config 5G maxinum memory for container? -- WangXiaoyu
Memory used in Spark-0.9.0-incubating
ENV: Spark:0.9.0-incubating Hadoop:2.3.0 I run spark task on Yarn. I see the log in Nodemanager: 2014-09-25 17:43:34,141 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:37,171 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:40,210 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:43,239 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:46,269 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used My task parameter is : --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4 In myopinion --worker-memory 4g 4g is the maximum memory for container . But why 4.5 GB of 5 GB physical memory used in the log? And where to config 5G maxinum memory for container? -- 王晓雨 【云平台-开放云事业部】数据引擎组 - 手机:18600049984 电话:01062425760 邮箱:wangxiao...@jd.com 邮编:100195 地址:北京市海淀区杏石口路65号益园文创基地C区11号楼3层 -
Re: Memory used in Spark-0.9.0-incubating
My yarn-site.xml config: property nameyarn.nodemanager.resource.memory-mb/name value16384/value /property ENV: Spark:0.9.0-incubating Hadoop:2.3.0 I run spark task on Yarn. I see the log in Nodemanager: 2014-09-25 17:43:34,141 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:37,171 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:40,210 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:43,239 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:46,269 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used My task parameter is : --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4 In myopinion --worker-memory 4g 4g is the maximum memory for container . But why 4.5 GB of 5 GB physical memory used in the log? And where to config 5G maxinum memory for container? -- WangXiaoyu -
Re: Re:
I had similar problem writing to cassandra using the connector for cassandra. I am not sure whether this will work or not but I reduced the number of cores to 1 per machine and my job was stable. More explanation of my issue... http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-Connector-Issue-and-performance-td15005.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/no-subject-tp15019p15134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.io.FileNotFoundException in usercache
I work with spark on unstable cluster with bad administration. I started get 14/09/25 15:29:56 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174 java.io.FileNotFoundException: /local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174 (No such file or directory) couple days ago. After this error spark context shuted down. I'm are that there are some problems with distributed cache on cluster, some people add too much data in it. I totally don't understand what's going on, but willing to undertand deeply. 1) Does spark somehow rely on yarn localization mechanizm? 2) What is directory usercache about? 3) Is there a quick way to go around of problem? 4) Isn't shutting spark context is overreaction on this error? -- *Sincerely yoursEgor PakhomovDeveloper, Yandex*
SPARK 1.1.0 on yarn-cluster and external JARs
We build some SPARK jobs with external jars. I compile jobs by including them in one assembly. But look for an approach to put all external jars into HDFS. We have already put spark jar in a HDFS folder and set up the variable SPARK_JAR. What is the best way to do that for other external jars (MongoDB, algebird and so on)? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory used in Spark-0.9.0-incubating
You should check the log of resource manager when you submit this job to yarn. It will be recorded how many resources your spark application actually asked from resource manager for each container. Did you use fair scheduler? there is a config parameter of fair scheduler “yarn.scheduler.increment-allocation-mb”, default is 1024 it means if you ask 4097mb memory for a container, the resource manager will create a container which use 5120mb memory. But I can’t figure out where 5GB come from. Maybe there are some codes which mistake 1024 and 1000? Best Regards, Yi Tian tianyi.asiai...@gmail.com On Sep 25, 2014, at 18:41, 王晓雨 wangxiao...@jd.com wrote: My yarn-site.xml config: property nameyarn.nodemanager.resource.memory-mb/name value16384/value /property ENV: Spark:0.9.0-incubating Hadoop:2.3.0 I run spark task on Yarn. I see the log in Nodemanager: 2014-09-25 17:43:34,141 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:37,171 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:40,210 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:43,239 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:46,269 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 5.0 GB of 10.5 GB virtual memory used My task parameter is : --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4 In my opinion --worker-memory 4g 4g is the maximum memory for container . But why 4.5 GB of 5 GB physical memory used in the log? And where to config 5G maxinum memory for container? -- WangXiaoyu -
Update gcc version ,Still snappy error.
I update the spark version form 1.02 to 1.10 , experienced an snappy version issue with the new Spark-1.1.0. After update the glibc version, occured a another issue. I abstract the log as follows: 14/09/25 11:29:18 WARN [org.apache.hadoop.util.NativeCodeLoader---main]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/25 11:29:19 WARN [org.apache.hadoop.hdfs.DomainSocketFactory---main]: The short-circuit local reads feature is disabled because libhadoop cannot be loaded. WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-0]: Lost task 0.0 in stage 1.0 (TID 1, spark-dev134): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.clinit(Snappy.java:48) org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-3]: Lost task 4.0 in stage 1.0 (TID 4, spark-dev134): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) 14/09/25 11:29:24 ERROR [org.apache.spark.network.ConnectionManager---handle-read-write-executor-3]: Corresponding SendingConnection to ConnectionManagerId(spark-dev135,38649) not found 14/09/25 11:29:24 INFO [org.apache.spark.scheduler.DAGScheduler---main]: Failed to run count at SessionSVD2.scala:23 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 9, spark-dev135): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 1)I tried to print JAVA_LIBRARY_PATH ,native-hadoop library is not in the path,and I set the System.setProperty(JAVA_LIBRARY_PATH,hadoop_home/lib/native/),which only effect in System.getenv(),but not the System.getProperty(JAVA_LIBRARY_PATH) .And hadoop_home/lib/native/ contain libhadoop libsnappy.so file,whcih I want to include in path. 2)I found in /tmp there are many snappy-uuuid file,each time i submit a job it create a snappy-uuuid file. Before I update the glibc version,my fellow update the snappy version,I think this is the reason why it can find the snappy file but libhadoop. Is there any ideas? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Update-gcc-version-Still-snappy-error-tp15137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pregel messages serialized in local machine?
This is a question on using the Pregel function in GraphX. Does a message get serialized and then de-serialized in the scenario where both the source and the destination vertices are in the same compute node/machine? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pregel-messages-serialized-in-local-machine-tp15140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Systematic error when re-starting Spark stream unless I delete all checkpoints
I experience spark streaming restart issues similar to what is discussed in the 2 threads below (in which I failed to find a solution). Could anybody let me know if anything is wrong in the way I start/stop or if this could be a spark bug? http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html My stream reads a Kafka topic, does some processing involving an updatStateByKey and saves the result to HDFS. The context is (re)-created at startup as follows: And the start-up and shutdown of the stream is handled as follows: When starting the stream for the first time (with spark-submit), the processing happens successfully, folders are created on the target HDFS folder and streaming stats are visible on http://sparkhost:4040/streaming. After letting the streaming work several minutes and then stopping it (ctrl-c on the command line), the following info is visible in the checkpoint folder: (checkpoint clean-up seems to happen since the stream ran for much more than 5 times 10 seconds) When re-starting the stream, the startup fails with the error below, http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is added in the target folder and no new checkpoint are created: Now if I delete all older checkpoints and keep only the most recent one: I end up with this (kafka?) actor non unique name error. If I delete the checkpoint folder the stream starts successfully (but I lose my ongoing stream state, obviously) We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged with CDH 5.1.0 and Hive: Any comment or suggestion would be greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Hive max key length is 767 bytes
Sorry for missing your original email - thanks for the catch, eh?! On Thu, Sep 25, 2014 at 7:14 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Fixed the issue by downgrade hive from 13.1 to 12.0, it works well now. Regards On 31 Aug, 2014, at 7:28 am, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Already done but still get the same error: (I use HIVE 0.13.1 Spark 1.0.2, Hadoop 2.4.1) Steps: Step 1) mysql: alter database hive character set latin1; Step 2) HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Step 3) scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala hiveContext.hql(“create table test_datatype3 (testbigint bigint)”) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Should I use HIVE 0.12.0 instead of HIVE 0.13.1? Regards Arthur On 31 Aug, 2014, at 6:01 am, Denny Lee denny.g@gmail.com wrote: Oh, you may be running into an issue with your MySQL setup actually, try running alter database metastore_db character set latin1 so that way Hive (and the Spark HiveContext) can execute properly against the metastore. On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com ( arthur.hk.c...@gmail.com) wrote: Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by:
Re: SPARK 1.1.0 on yarn-cluster and external JARs
SparkContext.addJar()? Why you didn't like fat jar way? 2014-09-25 16:25 GMT+04:00 rzykov rzy...@gmail.com: We build some SPARK jobs with external jars. I compile jobs by including them in one assembly. But look for an approach to put all external jars into HDFS. We have already put spark jar in a HDFS folder and set up the variable SPARK_JAR. What is the best way to do that for other external jars (MongoDB, algebird and so on)? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Sincerely yoursEgor PakhomovScala Developer, Yandex*
how to run spark job on yarn with jni lib?
Hi all, I tried to run my spark job on yarn. In my application, I need to call third-parity jni libraries in a spark job. However, I can't find a way to make spark job load my native libraries. Is there anyone who knows how to solve this problem? Thanks. Ziv Huang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to run spark job on yarn with jni lib?
Hmmm, you might be suffering from SPARK-1719. Not sure what the proper workaround is, but it sounds like your native libs are not in any of the standard lib directories; one workaround might be to copy them there, or add their location to /etc/ld.so.conf (I'm assuming Linux). On Thu, Sep 25, 2014 at 8:34 AM, taqilabon g945...@gmail.com wrote: Hi all, I tried to run my spark job on yarn. In my application, I need to call third-parity jni libraries in a spark job. However, I can't find a way to make spark job load my native libraries. Is there anyone who knows how to solve this problem? Thanks. Ziv Huang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL use of alias in where clause
Thanks, Yanbo and Nicholas. Now it makes more sense — query optimization is the answer. /Du From: Nicholas Chammas nicholas.cham...@gmail.commailto:nicholas.cham...@gmail.com Date: Thursday, September 25, 2014 at 6:43 AM To: Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark SQL use of alias in where clause That is correct. Aliases in the SELECT clause can only be referenced in the ORDER BY and HAVING clauses. Otherwise, you'll have to just repeat the statement, like concat() in this case. A more elegant alternative, which is probably not available in Spark SQL yet, is to use Common Table Expressionshttp://technet.microsoft.com/en-us/library/ms190766(v=sql.105).aspx. On Wed, Sep 24, 2014 at 11:32 PM, Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com wrote: Maybe it's the way SQL works. The select part is executed after the where filter is applied, so you cannot use alias declared in select part in where clause. Hive and Oracle behavior the same as Spark SQL. 2014-09-25 8:58 GMT+08:00 Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid: Hi, The following query does not work in Shark nor in the new Spark SQLContext or HiveContext. SELECT key, value, concat(key, value) as combined from src where combined like ’11%’; The following tweak of syntax works fine although a bit ugly. SELECT key, value, concat(key, value) as combined from src where concat(key,value) like ’11%’ order by combined; Are you going to support alias in where clause soon? Thanks, Du
Re: Multiple Kafka Receivers and Union
I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck. mn On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote: Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple) throws Exception { return new Tuple2(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote: Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream
VertexRDD partition imbalance
Hi all VertexRDD is partitioned with HashPartitioner, and it exhibits some imbalance of tasks. For example, Connected Components with partition strategy Edge2D: Aggregated Metrics by Executor Executor ID Task Time Total Tasks Failed Tasks Succeeded Tasks Input Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 1 10 s10 0 10 234.6 MB0.0 B 43.2 MB 0.0 B 0.0 B 2 3 s 3 0 3 70.4 MB 0.0 B 13.0 MB 0.0 B 0.0 B 3 6 s 6 0 6 140.7 MB0.0 B 25.9 MB 0.0 B 0.0 B 4 9 s 8 0 8 187.9 MB0.0 B 34.6 MB 0.0 B 0.0 B 5 10 s9 0 9 211.4 MB0.0 B 38.9 MB 0.0 B 0.0 B For a stage on mapPartitions at VertexRDD.scala:347 343 344 /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ 345 private[graphx] def shipVertexAttributes( 346 shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { 347 partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst))) 348 } 349 This is executed for every iteration in Pregel, so the imbalance is bad for performance. However, when run PageRank with Edge2D, the tasks are even across executors. (all finish 6 tasks) Our configuration is 6 node, 36 partitions. My questions is: What decides the number of tasks for different executors? And how to make it balance? Thanks! Larry
Working on LZOP Files
Hi, Anybody using LZOP files to process in Spark? We have a huge volume of LZOP files in HDFS to process through Spark. In MapReduce framework, it automatically detects the file format and sends the decompressed version to Mappers. Any such support in Spark? As of now I am manually downloading, decompressing it before processing. Thanks, Harsha
RE: MLUtils.loadLibSVMFile error
Hi Liquan, Thanks. I was running this in spark-shell. I was able to resolve this issue by creating an app and then submitting it via spark-submit in yarn-client mode. I have seen this happening before as well -- submitting via spark-shell has memory issues. The same code then works fine when submitted as an app in spark-submit yarn-client mode. I am not sure whether this is due to difference between spark-shell and spark-submit or yarn vs non-yarn mode. Date: Wed, 24 Sep 2014 22:13:35 -0700 Subject: Re: MLUtils.loadLibSVMFile error From: liquan...@gmail.com To: ssti...@live.com CC: so...@cloudera.com; user@spark.apache.org Hi Sameer, I think there are two things that you can do1) What is your current driver-memory or executor-memory, you can try to Increate driver-memory or executor-memory to see if that solves your problem. 2) How many features in your data? Two many features may create a large number of temp objects, which may also cause GC to happen. Hope this helps!Liquan On Wed, Sep 24, 2014 at 9:50 PM, Sameer Tilak ssti...@live.com wrote: Hi All,I was able to solve this formatting issue. However, I have another question. When I do the following, val examples: RDD[LabeledPoint] =MLUtils.loadLibSVMFile(sc,structured/results/data.txt) I get java.lang.OutOfMemoryError: GC overhead limit exceeded error. Is it possible to specify the number of partitions explicitly? I want to add that this dataset is sparse and is fairly small -- ~250 MB. Error log: 14/09/24 21:41:02 ERROR Executor: Exception in task ID 0java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337)at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)14/09/24 21:41:02 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337)at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Optimal Partition Strategy
Hello, A bit of a background. I have a dataset with about 200 million records and around 10 columns. The size of this dataset is around 1.5Tb and is split into around 600 files. When I read this dataset, using sparkContext, by default it creates around 3000 partitions if I do not specify the number of partitions in the textFile() command. Now I see that even though my spark application has around 400 executors assigned to it, the data is spread out only to about 200 of them. I am using .cache() method to hold my data in-memory. Each of these 200 executors, each with a total available memory of 6Gb, are now having multiple blocks and are thus using up their entire memory by caching the data. Even though I have about 400 machines, only about 200 of them are actually being used. Now, my question is: How do I partition my data so all 400 of the executors have some chunks of the data, thus better parallelizing my work? So, instead of only about 200 machines having about 6Gb of data each, I would like to have 400 machines with about 3Gb data each. Any idea on how I can set about achieving the above? Thanks, Vinay
Re: Pregel messages serialized in local machine?
At 2014-09-25 06:52:46 -0700, Cheuk Lam chl...@hotmail.com wrote: This is a question on using the Pregel function in GraphX. Does a message get serialized and then de-serialized in the scenario where both the source and the destination vertices are in the same compute node/machine? Yes, message passing currently uses partitionBy, which shuffles all messages, including ones to the same machine, by serializing them and writing them to disk. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question About Submit Application
Then I think it's time for you to look at the Spark Master logs... On Thu, Sep 25, 2014 at 7:51 AM, danilopds danilob...@gmail.com wrote: Hi Marcelo, Yes, I can ping spark-01 and I also include the IP and host in my file /etc/hosts. My VM can ping the local machine too. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Question-About-Submit-Application-tp15072p15145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming + Actors
Hi Team, Can I use Actors in Spark Streaming based on events type? Could you please review below Test program and let me know if any thing I need to change with respect to best practices import akka.actor.Actor import akka.actor.{ActorRef, Props} import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import akka.actor.ActorSystem case class one(r: org.apache.spark.rdd.RDD[String]) case class two(s: org.apache.spark.rdd.RDD[String]) class Events extends Actor { def receive = { // Based on event type - Invoke respective methods asynchronously case one(r) = println(ONE COUNT + r.count) // Invoke respective functions case two(s) = println(TWO COUNT + s.count) // Invoke respective functions } } object Test { def main(args: Array[String]) { val system = ActorSystem(System) val event: ActorRef = system.actorOf(Props[Events], events) val sparkConf = new SparkConf() setAppName(AlertsLinesCount) setMaster(local) val ssc = new StreamingContext(sparkConf, Seconds(30)) val lines = ssc textFileStream(hdfs://localhost:9000/user/rajesh/EventsDirectory/) lines foreachRDD(x = { event ! one(x) event ! two(x) }) ssc.start ssc.awaitTermination } } Regards, Rajesh
Re: Yarn number of containers
On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote: I am running spark with the default settings in yarn client mode. For some reason yarn always allocates three containers to the application (wondering where it is set?), and only uses two of them. The default number of executors in Yarn mode is 2; so you have 2 executors + the application master, so 3 containers. Also the cpus on the cluster never go over 50%, I turned off the fair scheduler and set high spark.cores.max. Is there some additional settings I am missing? You probably need to request more cores (--executor-cores). Don't remember if that is respected in Yarn, but should be. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLUtils.loadLibSVMFile error
Hi Sameer, When starting spark-shell, by default, the JVM for spark-shell only have 512M memory. For a quick hack, you can use SPARK_MEM=4g bin/spark-shell to set JVM memory to be 4g. For more information, you can refer http://spark.apache.org/docs/latest/cluster-overview.html Thanks, Liquan On Thu, Sep 25, 2014 at 9:46 AM, Sameer Tilak ssti...@live.com wrote: Hi Liquan, Thanks. I was running this in spark-shell. I was able to resolve this issue by creating an app and then submitting it via spark-submit in yarn-client mode. I have seen this happening before as well -- submitting via spark-shell has memory issues. The same code then works fine when submitted as an app in spark-submit yarn-client mode. I am not sure whether this is due to difference between spark-shell and spark-submit or yarn vs non-yarn mode. -- Date: Wed, 24 Sep 2014 22:13:35 -0700 Subject: Re: MLUtils.loadLibSVMFile error From: liquan...@gmail.com To: ssti...@live.com CC: so...@cloudera.com; user@spark.apache.org Hi Sameer, I think there are two things that you can do 1) What is your current driver-memory or executor-memory, you can try to Increate driver-memory or executor-memory to see if that solves your problem. 2) How many features in your data? Two many features may create a large number of temp objects, which may also cause GC to happen. Hope this helps! Liquan On Wed, Sep 24, 2014 at 9:50 PM, Sameer Tilak ssti...@live.com wrote: Hi All, I was able to solve this formatting issue. However, I have another question. When I do the following, val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,structured/results/data.txt) I get java.lang.OutOfMemoryError: GC overhead limit exceeded error. Is it possible to specify the number of partitions explicitly? I want to add that this dataset is sparse and is fairly small -- ~250 MB. Error log: 14/09/24 21:41:02 ERROR Executor: Exception in task ID 0 java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337) at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 14/09/24 21:41:02 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337) at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at
Re: SPARK 1.1.0 on yarn-cluster and external JARs
You can pass the HDFS location of those extra jars in the spark-submit --jars argument. Spark will take care of using Yarn's distributed cache to make them available to the executors. Note that you may need to provide the full hdfs URL (not just the path, since that will be interpreted as a local path, IIRC). Also, you may run into SPARK-3560, which has been fixed since but is not in 1.1.0. On Thu, Sep 25, 2014 at 5:25 AM, rzykov rzy...@gmail.com wrote: We build some SPARK jobs with external jars. I compile jobs by including them in one assembly. But look for an approach to put all external jars into HDFS. We have already put spark jar in a HDFS folder and set up the variable SPARK_JAR. What is the best way to do that for other external jars (MongoDB, algebird and so on)? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NegativeArraySizeException in pyspark
Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), suggesting that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File
Re:
I followed linked JIRAs to HDFS-7005 which is in hadoop 2.6.0 Any chance of deploying 2.6.0-SNAPSHOT to see if the problem goes away ? On Wed, Sep 24, 2014 at 10:54 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like it's a HDFS issue, pretty new. https://issues.apache.org/jira/browse/HDFS-6999 Jianshi On Thu, Sep 25, 2014 at 12:10 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Ted, See my previous reply to Debasish, all region servers are idle. I don't think it's caused by hotspotting. Besides, only 6 out of 3000 tasks were stuck, and their inputs are about only 80MB each. Jianshi On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote: I was thinking along the same line. Jianshi: See http://hbase.apache.org/book.html#d0e6369 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com wrote: HBase regionserver needs to be balancedyou might have some skewness in row keys and one regionserver is under pressuretry finding that key and replicate it using random salt On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Ted, It converts RDD[Edge] to HBase rowkey and columns and insert them to HBase (in batch). BTW, I found batched Put actually faster than generating HFiles... Jianshi On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote: bq. at com.paypal.risk.rds.dragon. storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3. apply(HbaseRDDBatch.scala:179) Can you reveal what HbaseRDDBatch.scala does ? Cheers On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: One of my big spark program always get stuck at 99% where a few tasks never finishes. I debugged it by printing out thread stacktraces, and found there're workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup. Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1. The parquet files are generated by pig using latest parquet-pig-bundle v1.6.0rc1. From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this be problematic? One of the weird behavior is that another program read and sort data read from the same parquet files and it works fine. The only difference seems the buggy program uses foreachPartition and the working program uses map. Here's the full stacktrace: Executor task launch worker-3 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102) at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173) at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138) at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
Re: Multiple Kafka Receivers and Union
Tim, I think I understand this now. I had a five node Spark cluster and a five partition topic, and I created five receivers. I found this: http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing. If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages. mn On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote: I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck. mn On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote: Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple)
Add Meetup
Please add the Apache Spark Maryland meetup to the Spark website. http://www.meetup.com/Apache-Spark-Maryland Thanks! Brian *Brian Husted* *Tetra Concepts, LLC* tetraconcepts.com *301.518.6994 (c)* *866.618.1343 (f)*
Re: RDD of Iterable[String]
Hi Deep, I believe that you are referring to the map for Iterable[String] suppose you have iter:Iterable[String] you can do newIter = iter.map(item = Item + a ) which will create an new Iterable[String] with each element appending an a to all string in iter. Does this answer your question? Liquan On Thu, Sep 25, 2014 at 12:43 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: what should come in the map?? Thanks Liquan for answering me... I really need some help..I am stuck in some thing. On Thu, Sep 25, 2014 at 12:57 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: what should come in the map?? On Wed, Sep 24, 2014 at 10:52 PM, Liquan Pei liquan...@gmail.com wrote: Hi Deep, The Iterable trait in scala has methods like map and reduce that you can use to iterate elements of Iterable[String]. You can also create an Iterator from the Iterable. For example, suppose you have val rdd: RDD[Iterable[String]] you can do rdd.map { x = //x has type Iterable[String] x.map(...) // Process elements in iterable[String] val iter:Iterator[String] = x.iterator while(iter.hasNext) { iter.next() } } Hope this helps! Liquan On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Can we iterate over RDD of Iterable[String]? How do we do that? Because the entire Iterable[String] seems to be a single element in the RDD. Thank You -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Multiple Kafka Receivers and Union
Additionally, If I dial up/down the number of executor cores, this does what I want. Thanks for the extra eyes! mn On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote: Tim, I think I understand this now. I had a five node Spark cluster and a five partition topic, and I created five receivers. I found this: http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing. If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages. mn On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote: I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck. mn On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote: Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final
Re: K-means faster on Mahout then on Spark
What is the size of your vector mine is set to 20? I am seeing slow results as well with iteration=5, # of elements 200,000,000. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-means-faster-on-Mahout-then-on-Spark-tp3195p15168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK UI - Details post job processiong
Hi, Details laid out in Spark UI for the job in progress is really interesting and very useful. But this gets vanished once the job is done. Is there a way to get job details post processing? Looking for Spark UI data, not standard input,output and error info. Thanks, Harsha
Ungroup data
Hi everyone, I need some advice about how to make the following: having a RDD of vectors (each vector being Vector(Int, Int , Int, int)), I need to group the data, then I need to apply a function to every group comparing each consecutive item within a group and retaining a variable (that has to be added to the end of each vector) if a condition from the comparison is true. I show an example next: (1, 2, 5, 2) (1, 3, 4, 4) (1, 3, 7, 3) (1, 3, 4, 8) Data are grouped by the two first fields, then for each group I have to compare each consecutive fourth field, the first field is used as initial value and then, if the next value is greater than the previous one that will be the next retained value added to the vector. So, the output should be: (1, 2 , 5, 2, 2) (1, 3 ,4, 4, 4) (1, 3 , 7, 3, 4) (1, 3, 4, 8, 8) My attempt is a groupBy and then a map with a loop for inside, then I have to build a vector of vectors adding the new field. However, I am not being able to get the right output since I cannot add a new field to the vector. I do not know either what should be the right output from the map to get the same shape than the original data once it has been grouped. Besides, my though is that the loop for is not the best option to iterate through the elements of each group. And finally, maybe this can be done with other operations like reducebykey or so. Any clue is very appreciated... Thanks in advance!
Spark streaming - submit new job version
Hi. We are testing Spark streaming. Its looks awesome! We are trying to figure how to submit a new version of a live forever job. We have a job that streams metrics of a bunch of servers applying transformations like .reduceByWindow and then stores the results in hdfs. If we submit this new version, there will be two jobs fighting for the same stream and the results metrics were be incomplete in each window. If we stop the original job and then submit the new one, metrics will be lost. Any thoughts? Could two jobs share and aggregate streams? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-submit-new-job-version-tp15173.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming: No parallelism in writing to database (MySQL)
I posted yesterday about a related issue but resolved it shortly after. I'm using Spark Streaming to summarize event data from Kafka and save it to a MySQL table. Currently the bottleneck is in writing to MySQL and I'm puzzled as to how to speed it up. I've tried repartitioning with several different values but it looks like only one worker is actually doing the writing to MySQL. Obviously this is not ideal because I need the parallelism to insert this data in a timely manner. Here's the code https://gist.github.com/maddenpj/5032c76aeb330371a6e6 https://gist.github.com/maddenpj/5032c76aeb330371a6e6 I'm running this on a cluster of 6 spark nodes (2 cores, 7.5 GB memory) and tried repartition sizes of 6, 12 and 48. How do I ensure that there is parallelism in writing to the database? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Kryo UnsupportedOperationException
We're running into an error (below) when trying to read spilled shuffle data back in. Has anybody encountered this before / is anybody familiar with what causes these Kryo UnsupportedOperationExceptions? any guidance appreciated, Sandy --- com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: omitted variable name (omitted class name) omitted variable name (omitted class name)) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) ...
Re: Yarn number of containers
Thank you. Where is the number of containers set? On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin van...@cloudera.com wrote: On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote: I am running spark with the default settings in yarn client mode. For some reason yarn always allocates three containers to the application (wondering where it is set?), and only uses two of them. The default number of executors in Yarn mode is 2; so you have 2 executors + the application master, so 3 containers. Also the cpus on the cluster never go over 50%, I turned off the fair scheduler and set high spark.cores.max. Is there some additional settings I am missing? You probably need to request more cores (--executor-cores). Don't remember if that is respected in Yarn, but should be. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Yarn number of containers
From spark-submit --help: YARN-only: --executor-cores NUMNumber of cores per executor (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: default). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. On Thu, Sep 25, 2014 at 2:20 PM, Tamas Jambor jambo...@gmail.com wrote: Thank you. Where is the number of containers set? On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin van...@cloudera.com wrote: On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote: I am running spark with the default settings in yarn client mode. For some reason yarn always allocates three containers to the application (wondering where it is set?), and only uses two of them. The default number of executors in Yarn mode is 2; so you have 2 executors + the application master, so 3 containers. Also the cpus on the cluster never go over 50%, I turned off the fair scheduler and set high spark.cores.max. Is there some additional settings I am missing? You probably need to request more cores (--executor-cores). Don't remember if that is respected in Yarn, but should be. -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NegativeArraySizeException in pyspark
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), What is the error? Could you file a JIRA for it? that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative
Re: java.lang.OutOfMemoryError while running SVD MLLib example
Hi Xianguri, After setting SVD to smaller value (200) its working. Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-while-running-SVD-MLLib-example-tp14972p15179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: No parallelism in writing to database (MySQL)
Update for posterity, so once again I solved the problem shortly after posting to the mailing list. So updateStateByKey uses the default partitioner, which in my case seemed like it was set to one. Changing my call from .updateStateByKey[Long](updateFn) - .updateStateByKey[Long](updateFn, numPartitions) resolved it for me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kryo UnsupportedOperationException
I would guess the field serializer is having issues being able to reconstruct the class again, its pretty much best effort. Is this an intermediate type? On Thu, Sep 25, 2014 at 2:12 PM, Sandy Ryza sandy.r...@cloudera.com wrote: We're running into an error (below) when trying to read spilled shuffle data back in. Has anybody encountered this before / is anybody familiar with what causes these Kryo UnsupportedOperationExceptions? any guidance appreciated, Sandy --- com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: omitted variable name (omitted class name) omitted variable name (omitted class name)) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) ...
Re: Spark Streaming: No parallelism in writing to database (MySQL)
Thanks for the update.. I'm interested in writing the results to MySQL as well, can you share some light or code sample on how you setup the driver/connection pool/etc.? On Thu, Sep 25, 2014 at 4:00 PM, maddenpj madde...@gmail.com wrote: Update for posterity, so once again I solved the problem shortly after posting to the mailing list. So updateStateByKey uses the default partitioner, which in my case seemed like it was set to one. Changing my call from .updateStateByKey[Long](updateFn) - .updateStateByKey[Long](updateFn, numPartitions) resolved it for me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle files
Hi, I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a directory (I am using sc.textfile(dir/*) ) to read in the files. I am getting the following warning: WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, mesos12-dev.sccps.net): java.io.FileNotFoundException: /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open files) basically I think a lot of shuffle files are being created. 1) The tasks eventually fail and the job just hangs (after taking very long, more than an hour). If I read these 30 files in a for loop, the same job completes in a few minutes. However, I need to specify the files names, which is not convenient. I am assuming that sc.textfile(dir/*) creates a large RDD for all the 30 files. Is there a way to make the operation on this large RDD efficient so as to avoid creating too many shuffle files? 2) Also, I am finding that all the shuffle files for my other completed jobs are not being automatically deleted even after days. I thought that sc.stop() clears the intermediate files. Is there some way to programmatically delete these temp shuffle files upon job completion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-ec2 ERROR: Line magic function `%matplotlib` not found
Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}
Is it possible to use Parquet with Dremel encoding
Hi again! At the moment I try to use parquet and I want to keep the data into the memory in an efficient way to make requests against the data as fast as possible. I read about parquet it is able to encode nested columns. Parquet uses the Dremel encoding with definition and repetition levels. Is it at the moment possible to use this in spark as well or is it actually not implemented? If yes, I’m not sure how to do it. I saw some examples, they try to put some arrays or case classes in other case classes, nut I don’t think that is the right way. The other thing that I saw in this relation was SchemaRDDs. Input: Col1| Col2| Col3| Col4 Int | long| long| int - 14 | 1234| 1422| 3 14 | 3212| 1542| 2 14 | 8910| 1422| 8 15 | 1234| 1542| 9 15 | 8897| 1422| 13 Want this Parquet-format: Col3| Col1| Col4| Col2 long| int | int | long 1422| 14 | 3 | 1234 “ | “ | 8 | 8910 “ | 15 | 13 | 8897 1542| 14 | 2 | 3212 “ | 15 | 9 | 1234 It would be awesome if somebody could give me a good hint how can I do that or maybe a better way. Best, Matthes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: K-means faster on Mahout then on Spark
Please also check the load balance of the RDD on YARN. How many partitions are you using? Does it match the number of CPU cores? -Xiangrui On Thu, Sep 25, 2014 at 12:28 PM, bhusted brian.hus...@gmail.com wrote: What is the size of your vector mine is set to 20? I am seeing slow results as well with iteration=5, # of elements 200,000,000. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-means-faster-on-Mahout-then-on-Spark-tp3195p15168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Job cancelled because SparkContext was shut down
hi all, I am getting this strange error about half way through the job (running spark 1.1 on yarn client mode): 14/09/26 00:54:06 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@4d0155fb java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/09/26 00:54:06 INFO YarnClientSchedulerBackend: Executor 1 disconnected, so removing it then a few minutes later the whole process dies: 14/09/26 01:00:12 ERROR YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/09/26 01:00:13 INFO SparkUI: Stopped Spark web UI at http://backend-dev:4040 14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Shutting down all executors 14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Asking each executor to shut down [E 140926 01:00:13 base:56] Request failed 14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Stopped [E 140926 01:00:13 base:57] {'error_msg': type 'exceptions.Exception', org.apache.spark.SparkException: Job cancelled because SparkContext was shut down, traceback object at 0x4483cb0} any idea what's going on here? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-cancelled-because-SparkContext-was-shut-down-tp15189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Yarn number of containers
thanks. On Thu, Sep 25, 2014 at 10:25 PM, Marcelo Vanzin [via Apache Spark User List] ml-node+s1001560n15177...@n3.nabble.com wrote: From spark-submit --help: YARN-only: --executor-cores NUMNumber of cores per executor (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: default). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. On Thu, Sep 25, 2014 at 2:20 PM, Tamas Jambor [hidden email] wrote: Thank you. Where is the number of containers set? On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin [hidden email] wrote: On Thu, Sep 25, 2014 at 8:55 AM, jamborta [hidden email] wrote: I am running spark with the default settings in yarn client mode. For some reason yarn always allocates three containers to the application (wondering where it is set?), and only uses two of them. The default number of executors in Yarn mode is 2; so you have 2 executors + the application master, so 3 containers. Also the cpus on the cluster never go over 50%, I turned off the fair scheduler and set high spark.cores.max. Is there some additional settings I am missing? You probably need to request more cores (--executor-cores). Don't remember if that is respected in Yarn, but should be. -- Marcelo -- Marcelo - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-number-of-containers-tp15148p15177.html To unsubscribe from Yarn number of containers, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-number-of-containers-tp15148p15191.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multiple Kafka Receivers and Union
Good to know it worked out and thanks for the update. I didn't realize you need to provision for receiver workers + processing workers. One would think a worker would process multiple stages of an app/job and receive is just a stage of the job. On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell matt.narr...@gmail.com wrote: Additionally, If I dial up/down the number of executor cores, this does what I want. Thanks for the extra eyes! mn On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote: Tim, I think I understand this now. I had a five node Spark cluster and a five partition topic, and I created five receivers. I found this: http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing. If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages. mn On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote: I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck. mn On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote: Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class,
Re: Memory used in Spark-0.9.0-incubating
Thanks Yi Tian! Yes, I use fair scheduler. In resource manager log. I see the container's start shell: /home/export/Data/hadoop/tmp/nm-local-dir/usercache/hpc/appcache/application_1411693809133_0002/container_1411693809133_0002_01_02/launch_container.sh In the end: exec /bin/bash -c $JAVA_HOME/bin/java -server -XX:OnOutOfMemoryError='kill %p' *-Xms4096m -Xmx4096m* -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Djava.io.tmpdir=$PWD/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@node99:6177/user/CoarseGrainedScheduler 2 node99 4 1 /home/export/Logs/yarn/application_1411693809133_0002/container_1411693809133_0002_01_02/stdout 2 /home/export/Logs/yarn/application_1411693809133_0002/container_1411693809133_0002_01_02/stderr the container's maximum memory is *4096m* And I see the source of ContainerImpl.java. The monitor print 5G is from: long pmemBytes = *container.getResource().getMemory()* * 1024 * 1024L; The print method when value=L then print 5G. So only *container.getResource().getMemory()=**5120* can print 5G I don't know where is the 1024K from!!! 在 2014年09月25日 21:43, Yi Tian 写道: You should check the log of resource manager when you submit this job to yarn. It will be recorded how many resources your spark application actually asked from resource manager for each container. Did you use fair scheduler? there is a config parameter of fair scheduler “yarn.scheduler.increment-allocation-mb”, default is 1024 it means if you ask 4097mb memory for a container, the resource manager will create a container which use 5120mb memory. But I can’t figure out where 5GB come from. Maybe there are some codes which mistake 1024 and 1000? Best Regards, Yi Tian tianyi.asiai...@gmail.com mailto:tianyi.asiai...@gmail.com On Sep 25, 2014, at 18:41, 王晓雨 wangxiao...@jd.com mailto:wangxiao...@jd.com wrote: My yarn-site.xml config: property nameyarn.nodemanager.resource.memory-mb/name value16384/value /property ENV: Spark:0.9.0-incubating Hadoop:2.3.0 I run spark task on Yarn. I see the log in Nodemanager: 2014-09-25 17:43:34,141 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:37,171 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:40,210 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:43,239 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used 2014-09-25 17:43:46,269 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 549 for container-id container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory used; 5.0 GB of 10.5 GB virtual memory used My task parameter is : --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4 In myopinion --worker-memory 4g 4g is the maximum memory for container . But why 4.5 GB of 5 GB physical memory used in the log? And where to config 5G maxinum memory for container? -- WangXiaoyu -
flume spark streaming receiver host random
Hi all My code is as follows: /usr/local/webserver/sparkhive/bin/spark-submit --class org.apache.spark.examples.streaming.FlumeEventCount --master yarn --deploy-mode cluster --queue online --num-executors 5 --driver-memory 6g --executor-memory 20g --executor-cores 5 target/scala-2.10/simple-project_2.10-1.0.jar 10.1.15.115 6 However, the receiver does not in the 10.1.15.115, but the random choice of one slave host. How to solve this problem? Thanks -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to run spark job on yarn with jni lib?
You're right, I'm suffering from SPARK-1719. I've tried to add their location to /etc/ld.so.conf and I've submitted my job as a yarn-client, but the problem is the same: my native libraries are not loaded. Does this method work in your case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146p15195.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
Hi SK, For the problem with lots of shuffle files and the too many open files exception there are a couple options: 1. The linux kernel has a limit on the number of open files at once. This is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or /etc/sysctl.d/. Try increasing this to a large value, at the bare minimum the square of your partition count. 2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This option writes fewer files to disk so shouldn't hit limits nearly as much 3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT. You should likely hold off on this until https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in 1.1.1 Hope that helps! Andrew On Thu, Sep 25, 2014 at 4:20 PM, SK skrishna...@gmail.com wrote: Hi, I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a directory (I am using sc.textfile(dir/*) ) to read in the files. I am getting the following warning: WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, mesos12-dev.sccps.net): java.io.FileNotFoundException: /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open files) basically I think a lot of shuffle files are being created. 1) The tasks eventually fail and the job just hangs (after taking very long, more than an hour). If I read these 30 files in a for loop, the same job completes in a few minutes. However, I need to specify the files names, which is not convenient. I am assuming that sc.textfile(dir/*) creates a large RDD for all the 30 files. Is there a way to make the operation on this large RDD efficient so as to avoid creating too many shuffle files? 2) Also, I am finding that all the shuffle files for my other completed jobs are not being automatically deleted even after days. I thought that sc.stop() clears the intermediate files. Is there some way to programmatically delete these temp shuffle files upon job completion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK UI - Details post job processiong
Matt you should be able to set an HDFS path so you'll get logs written to a unified place instead of to local disk on a random box on the cluster. On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell matt.narr...@gmail.com wrote: How does this work with a cluster manager like YARN? mn On Sep 25, 2014, at 2:23 PM, Andrew Or and...@databricks.com wrote: Hi Harsha, You can turn on `spark.eventLog.enabled` as documented here: http://spark.apache.org/docs/latest/monitoring.html. Then, if you are running standalone mode, you can access the finished SparkUI through the Master UI. Otherwise, you can start a HistoryServer to display finished UIs. -Andrew 2014-09-25 12:55 GMT-07:00 Harsha HN 99harsha.h@gmail.com: Hi, Details laid out in Spark UI for the job in progress is really interesting and very useful. But this gets vanished once the job is done. Is there a way to get job details post processing? Looking for Spark UI data, not standard input,output and error info. Thanks, Harsha
Re: Optimal Partition Strategy
Hi Vinay, What I'm guessing is happening is that Spark is taking the locality of files into account and you don't have node-local data on all your machines. This might be the case if you're reading out of HDFS and your 600 files are somehow skewed to only be on about 200 of your 400 machines. A possible cause could be that your cluster doesn't have both Spark and an HDFS data node on ever server. It could also happen if you're loading data into HDFS from just those 200 machines -- the HDFS block placement strategy is to store one block locally and then further replicas of that block elsewhere on the cluster. If you want to force the data to be evenly distributed across your executors you can run .repartition() right after the textFile() call. This will shuffle data across the network in a hash partitioned way before continuing with the rest of your computation. Hope that helps! Andrew On Thu, Sep 25, 2014 at 10:37 AM, Muttineni, Vinay vmuttin...@ebay.com wrote: Hello, A bit of a background. I have a dataset with about 200 million records and around 10 columns. The size of this dataset is around 1.5Tb and is split into around 600 files. When I read this dataset, using sparkContext, by default it creates around 3000 partitions if I do not specify the number of partitions in the textFile() command. Now I see that even though my spark application has around 400 executors assigned to it, the data is spread out only to about 200 of them. I am using .cache() method to hold my data in-memory. Each of these 200 executors, each with a total available memory of 6Gb, are now having multiple blocks and are thus using up their entire memory by caching the data. Even though I have about 400 machines, only about 200 of them are actually being used. Now, my question is: How do I partition my data so all 400 of the executors have some chunks of the data, thus better parallelizing my work? So, instead of only about 200 machines having about 6Gb of data each, I would like to have 400 machines with about 3Gb data each. Any idea on how I can set about achieving the above? Thanks, Vinay
Re: Working on LZOP Files
Hi Harsha, I use LZOP files extensively on my Spark cluster -- see my writeup for how to do this on this mailing list post: http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCAOoZ679ehwvT1g8=qHd2n11Z4EXOBJkP+q=Aj0qE_=shhyl...@mail.gmail.com%3E Maybe we should better document how to use LZO with Spark because it can be tricky to get the lzo jars, native libraries, and hadoopFile() calls all set up correctly. Andrew On Thu, Sep 25, 2014 at 9:44 AM, Harsha HN 99harsha.h@gmail.com wrote: Hi, Anybody using LZOP files to process in Spark? We have a huge volume of LZOP files in HDFS to process through Spark. In MapReduce framework, it automatically detects the file format and sends the decompressed version to Mappers. Any such support in Spark? As of now I am manually downloading, decompressing it before processing. Thanks, Harsha
Re: quick start guide: building a standalone scala program
Hi Christy, I'm more of a Gradle fan but I know SBT fits better into the Scala ecosystem as a build tool. If you'd like to give Gradle a shot try this skeleton Gradle+Spark repo from my coworker Punya. https://github.com/punya/spark-gradle-test-example Good luck! Andrew On Thu, Sep 25, 2014 at 1:00 AM, christy 760948...@qq.com wrote: I encountered exactly the same problem. How did you solve this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Log hdfs blocks sending
Hi Alexey, You should see in the logs a locality measure like NODE_LOCAL, PROCESS_LOCAL, ANY, etc. If your Spark workers each have an HDFS data node on them and you're reading out of HDFS, then you should be seeing almost all NODE_LOCAL accesses. One cause I've seen for mismatches is if Spark uses short hostnames and Hadoop uses FQDNs -- in that case Spark doesn't think the data is local and does remote reads which really kills performance. Hope that helps! Andrew On Thu, Sep 25, 2014 at 12:09 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello again spark users and developers! I have standalone spark cluster (1.1.0) and spark sql running on it. My cluster consists of 4 datanodes and replication factor of files is 3. I use thrift server to access spark sql and have 1 table with 30+ partitions. When I run query on whole table (something simple like select count(*) from t) spark produces a lot of network activity filling all available 1gb link. Looks like spark sent data by network instead of local reading. Is it any way to log which blocks were accessed locally and which are not? Thanks!
Re: Spark Streaming: No parallelism in writing to database (MySQL)
Yup it's all in the gist: https://gist.github.com/maddenpj/5032c76aeb330371a6e6 Lines 6-9 deal with setting up the driver specifically. This sets the driver up on each partition which keeps the connection pool around per record. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]’} - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parallel spark jobs on standalone cluster
Hi All, I have a java program which submits a spark job to a standalone spark cluster (2 nodes; 10 cores (6+4); 12GB (8+4)). This is being called by another java program through ExecutorService and invokes it multiple times with different set of arguments and parameters. I have set spark memory usage to 3GB (tried with 2GB and 1GB also) When I run this for 4 parallel jobs - first job finishes successfully, second and third jobs start executing in parallel and never complete and fourth job is waiting in queue. I've tried using Fair Scheduler but didn't notice any change in the behavior. Also in the spark job submission program I'm calling SparkContext.stop at the end of execution. Some times all jobs fail with status as Exited. Please let me know what is going wrong and how to overcome the issue? ~Sarath
Spark Streaming: foreachRDD network output
Hello all, I have some questions regarding the foreachRDD output function in Spark Streaming. The programming guide ( http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html) describes how to output data using network connection on the worker nodes. Are there some working examples on how to do this properly? (Most of the guide just describes what to not do, instead of what to do). Any suggestions on what is the best way to write tests for such code? To make sure that connection objects are used properly etc. How to handle network or other problems on worker node? Can I throw an exception to force spark to try again with that data on another node? As an example: a program that writes data to an sql database using foreachRDD. One worker node might have connection issues to the database, so it has to let another node finish the output operation. Thanks! -- Jesper Lundgren
Re: YARN ResourceManager and Hadoop NameNode Web UI not visible in port 8088, port 50070
The problem is solved, the web interfaces are not opening in local network connecting to server with proxy its opening only in the servers without proxy .. On Thu, Sep 25, 2014 at 1:12 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Raghuveer, This might be a better question for the cdh-user list or the Hadoop user list. The Hadoop web interfaces for both the NameNode and ResourceManager are enabled by default. Is it possible you have a firewall blocking those ports? -Sandy On Wed, Sep 24, 2014 at 9:00 PM, Raghuveer Chanda raghuveer.cha...@gmail.com wrote: Hi, Im running a spark job in YARN cluster ..but im not able to see the Web Interface of the YARN ResourceManager and Hadoop NameNode Web UI in port 8088, port 50070 and spark stages. Only the spark UI in port 18080 is visible. I got the URL's from cloudera but may be due to some default option for security the Web Interface is disabled. How can i enable the web interface i.e is there any option in cloudera or is the server firewall is blocking it .. Please help .. -- Regards, Raghuveer Chanda 4th year Undergraduate Student Computer Science and Engineering IIT Kharagpur -- Regards, Raghuveer Chanda 4th year Undergraduate Student Computer Science and Engineering IIT Kharagpur
Re:
I built a patched DFSClient jar and now testing (takes 3 hours...) I'd like to know if I can patch spark builds? How about just replace DFSClient.class in spark-assembly jar? Jianshi On Fri, Sep 26, 2014 at 2:29 AM, Ted Yu yuzhih...@gmail.com wrote: I followed linked JIRAs to HDFS-7005 which is in hadoop 2.6.0 Any chance of deploying 2.6.0-SNAPSHOT to see if the problem goes away ? On Wed, Sep 24, 2014 at 10:54 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like it's a HDFS issue, pretty new. https://issues.apache.org/jira/browse/HDFS-6999 Jianshi On Thu, Sep 25, 2014 at 12:10 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Ted, See my previous reply to Debasish, all region servers are idle. I don't think it's caused by hotspotting. Besides, only 6 out of 3000 tasks were stuck, and their inputs are about only 80MB each. Jianshi On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote: I was thinking along the same line. Jianshi: See http://hbase.apache.org/book.html#d0e6369 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com wrote: HBase regionserver needs to be balancedyou might have some skewness in row keys and one regionserver is under pressuretry finding that key and replicate it using random salt On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Ted, It converts RDD[Edge] to HBase rowkey and columns and insert them to HBase (in batch). BTW, I found batched Put actually faster than generating HFiles... Jianshi On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote: bq. at com.paypal.risk.rds.dragon. storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3. apply(HbaseRDDBatch.scala:179) Can you reveal what HbaseRDDBatch.scala does ? Cheers On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: One of my big spark program always get stuck at 99% where a few tasks never finishes. I debugged it by printing out thread stacktraces, and found there're workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup. Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1. The parquet files are generated by pig using latest parquet-pig-bundle v1.6.0rc1. From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this be problematic? One of the weird behavior is that another program read and sort data read from the same parquet files and it works fine. The only difference seems the buggy program uses foreachPartition and the working program uses map. Here's the full stacktrace: Executor task launch worker-3 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102) at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173) at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138) at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at