Re: Dynamic allocator requests -1 executors
Hi Patrick, The fix you need is SPARK-6954: https://github.com/apache/spark/pull/5704. If possible, you may cherry-pick the following commit into your Spark deployment and it should resolve the issue: https://github.com/apache/spark/commit/98ac39d2f5828fbdad8c9a4e563ad1169e3b9948 Note that this commit is only for the 1.3 branch. If you could upgrade to 1.4.0 then you do not need to apply that commit yourself. -Andrew 2015-06-13 12:01 GMT-07:00 Patrick Woody patrick.woo...@gmail.com: Hey Sandy, I'll test it out on 1.4. Do you have a bug number or PR that I could reference as well? Thanks! -Pat Sent from my iPhone On Jun 13, 2015, at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Patrick, I'm noticing that you're using Spark 1.3.1. We fixed a bug in dynamic allocation in 1.4 that permitted requesting negative numbers of executors. Any chance you'd be able to try with the newer version and see if the problem persists? -Sandy On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
What is most efficient to do a large union and remove duplicates?
I have 10 folder, each with 6000 files. Each folder is roughly 500GB. So totally 5TB data. The data is formatted as key t/ value. After union, I want to remove the duplicates among keys. So each key should be unique and has only one value. Here is what I am doing. folders = Array(folder1,folder2folder10 ) var rawData = sc.textFile(folders(0)).map(x = (x.split(\t)(0), x.split(\t)(1))) for (a - 1 to sud_paths.length - 1) { rawData = rawData.union(sc.textFile(folders (a)).map(x = (x.split(\t)(0), x.split(\t)(1 } val nodups = rawData.reduceByKey((a,b)= { if(a.length b.length) {a} else {b} } ) nodups.saveAsTextFile(/nodups) Anything I could do to make this process faster? Right now my process dies when output the data to the HDFS. Thank you !
How to set up a Spark Client node?
I have following hadoop spark cluster nodes configuration: Nodes 1 2 are resourceManager and nameNode respectivly Nodes 3, 4, and 5 each includes nodeManager dataNode Node 7 is Spark-master configured to run yarn-client or yarn-master modes I have tested it and it works fine. Is there any instuctions on how to setup spark client in a cluster mode? I am not sure if I am doing it right. Thanks in advance
Re: [Spark] What is the most efficient way to do such a join and column manipulation?
Yes, its all just RDDs under the covers. DataFrames/SQL is just a more concise way to express your parallel programs. On Sat, Jun 13, 2015 at 5:25 PM, Rex X dnsr...@gmail.com wrote: Thanks, Don! Does SQL implementation of spark do parallel processing on records by default? -Rex On Sat, Jun 13, 2015 at 10:13 AM, Don Drake dondr...@gmail.com wrote: Take a look at https://github.com/databricks/spark-csv to read in the tab-delimited file (change the default delimiter) and once you have that as a DataFrame, SQL can do the rest. https://spark.apache.org/docs/latest/sql-programming-guide.html -Don On Fri, Jun 12, 2015 at 8:46 PM, Rex X dnsr...@gmail.com wrote: Hi, I want to use spark to select N columns, top M rows of all csv files under a folder. To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 100K rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this? -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ 800-733-2143
Re: How to read avro in SparkR
Hi, Not sure if this is it, but could you please try com.databricks.spark.avro instead of just avro. Thanks, Burak On Jun 13, 2015 9:55 AM, Shing Hing Man mat...@yahoo.com.invalid wrote: Hi, I am trying to read a avro file in SparkR (in Spark 1.4.0). I started R using the following. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 Inside the R shell, when I issue the following, read.df(sqlContext, file:///home/matmsh/myfile.avro,avro) I get the following exception. Caused by: java.lang.RuntimeException: Failed to load class for data source: avro Below is the stack trace. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 R version 3.2.0 (2015-04-16) -- Full of Ingredients Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-suse-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Launching java with spark-submit command /home/matmsh/installed/spark/bin/spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 sparkr-shell /tmp/RtmpoT7FrF/backend_port464e1e2fb16a Ivy Default Cache set to: /home/matmsh/.ivy2/cache The jars for the packages stored in: /home/matmsh/.ivy2/jars :: loading settings :: url = jar:file:/home/matmsh/installed/sparks/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-avro_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-avro_2.10;1.0.0 in list found org.apache.avro#avro;1.7.6 in local-m2-cache found org.codehaus.jackson#jackson-core-asl;1.9.13 in list found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in list found com.thoughtworks.paranamer#paranamer;2.3 in list found org.xerial.snappy#snappy-java;1.0.5 in list found org.apache.commons#commons-compress;1.4.1 in list found org.tukaani#xz;1.0 in list found org.slf4j#slf4j-api;1.6.4 in list :: resolution report :: resolve 421ms :: artifacts dl 16ms :: modules in use: com.databricks#spark-avro_2.10;1.0.0 from list in [default] com.thoughtworks.paranamer#paranamer;2.3 from list in [default] org.apache.avro#avro;1.7.6 from local-m2-cache in [default] org.apache.commons#commons-compress;1.4.1 from list in [default] org.codehaus.jackson#jackson-core-asl;1.9.13 from list in [default] org.codehaus.jackson#jackson-mapper-asl;1.9.13 from list in [default] org.slf4j#slf4j-api;1.6.4 from list in [default] org.tukaani#xz;1.0 from list in [default] org.xerial.snappy#snappy-java;1.0.5 from list in [default] - | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 9 | 0 | 0 | 0 || 9 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 9 already retrieved (0kB/9ms) 15/06/13 17:37:42 INFO spark.SparkContext: Running Spark version 1.4.0 15/06/13 17:37:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/13 17:37:42 WARN util.Utils: Your hostname, gauss resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface enp3s0) 15/06/13 17:37:42 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/13 17:37:42 INFO spark.SecurityManager: Changing view acls to: matmsh 15/06/13 17:37:42 INFO spark.SecurityManager: Changing modify acls to: matmsh 15/06/13 17:37:42 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(matmsh); users with modify permissions: Set(matmsh) 15/06/13 17:37:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/13 17:37:43 INFO Remoting: Starting remoting 15/06/13 17:37:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.10:46219] 15/06/13 17:37:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 46219. 15/06/13 17:37:43 INFO spark.SparkEnv: Registering MapOutputTracker 15/06/13 17:37:43 INFO spark.SparkEnv: Registering BlockManagerMaster 15/06/13 17:37:43 INFO storage.DiskBlockManager: Created local directory at
Re: How to read avro in SparkR
Yep - Burak's answer should work. FWIW the error message from the stack trace that shows this is the line Failed to load class for data source: avro Thanks Shivaram On Sat, Jun 13, 2015 at 6:13 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if this is it, but could you please try com.databricks.spark.avro instead of just avro. Thanks, Burak On Jun 13, 2015 9:55 AM, Shing Hing Man mat...@yahoo.com.invalid wrote: Hi, I am trying to read a avro file in SparkR (in Spark 1.4.0). I started R using the following. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 Inside the R shell, when I issue the following, read.df(sqlContext, file:///home/matmsh/myfile.avro,avro) I get the following exception. Caused by: java.lang.RuntimeException: Failed to load class for data source: avro Below is the stack trace. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 R version 3.2.0 (2015-04-16) -- Full of Ingredients Copyright (C) 2015 The R Foundation for Statistical Computing Platform: x86_64-suse-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. Launching java with spark-submit command /home/matmsh/installed/spark/bin/spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 sparkr-shell /tmp/RtmpoT7FrF/backend_port464e1e2fb16a Ivy Default Cache set to: /home/matmsh/.ivy2/cache The jars for the packages stored in: /home/matmsh/.ivy2/jars :: loading settings :: url = jar:file:/home/matmsh/installed/sparks/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-avro_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-avro_2.10;1.0.0 in list found org.apache.avro#avro;1.7.6 in local-m2-cache found org.codehaus.jackson#jackson-core-asl;1.9.13 in list found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in list found com.thoughtworks.paranamer#paranamer;2.3 in list found org.xerial.snappy#snappy-java;1.0.5 in list found org.apache.commons#commons-compress;1.4.1 in list found org.tukaani#xz;1.0 in list found org.slf4j#slf4j-api;1.6.4 in list :: resolution report :: resolve 421ms :: artifacts dl 16ms :: modules in use: com.databricks#spark-avro_2.10;1.0.0 from list in [default] com.thoughtworks.paranamer#paranamer;2.3 from list in [default] org.apache.avro#avro;1.7.6 from local-m2-cache in [default] org.apache.commons#commons-compress;1.4.1 from list in [default] org.codehaus.jackson#jackson-core-asl;1.9.13 from list in [default] org.codehaus.jackson#jackson-mapper-asl;1.9.13 from list in [default] org.slf4j#slf4j-api;1.6.4 from list in [default] org.tukaani#xz;1.0 from list in [default] org.xerial.snappy#snappy-java;1.0.5 from list in [default] - | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 9 | 0 | 0 | 0 || 9 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 9 already retrieved (0kB/9ms) 15/06/13 17:37:42 INFO spark.SparkContext: Running Spark version 1.4.0 15/06/13 17:37:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/13 17:37:42 WARN util.Utils: Your hostname, gauss resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface enp3s0) 15/06/13 17:37:42 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/13 17:37:42 INFO spark.SecurityManager: Changing view acls to: matmsh 15/06/13 17:37:42 INFO spark.SecurityManager: Changing modify acls to: matmsh 15/06/13 17:37:42 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(matmsh); users with modify permissions: Set(matmsh) 15/06/13 17:37:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/13 17:37:43 INFO Remoting: Starting remoting 15/06/13 17:37:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.10:46219] 15/06/13 17:37:43 INFO util.Utils: Successfully started service 'sparkDriver' on
Re: Dataframe Write : Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
I got it. Thanks! Patcharee On 13/06/15 23:00, Will Briggs wrote: The context that is created by spark-shell is actually an instance of HiveContext. If you want to use it programmatically in your driver, you need to make sure that your context is a HiveContext, and not a SQLContext. https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Hope this helps, Will On June 13, 2015, at 3:36 PM, pth001 patcharee.thong...@uni.no wrote: Hi, I am using spark 0.14. I try to insert data into a hive table (in orc format) from DF. partitionedTestDF.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(testorc) When this job is submitted by spark-submit I get Exception in thread main java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead But the job works fine on spark-shell. What can be wrong? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataframe Write : Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
As the error message says, were you using a |SQLContext| instead of a |HiveContext| to create the DataFrame? In Spark shell, although the variable name is |sqlContext|, the type of that variable is actually |org.apache.spark.sql.hive.HiveContext|, which has the ability to communicate with Hive metastore. Cheng On 6/13/15 12:36 PM, pth001 wrote: Hi, I am using spark 0.14. I try to insert data into a hive table (in orc format) from DF. partitionedTestDF.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(testorc) When this job is submitted by spark-submit I get Exception in thread main java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead But the job works fine on spark-shell. What can be wrong? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org .
Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space
hey guys I tried the following settings as well. No luck --total-executor-cores 24 --executor-memory 4G BTW on the same cluster , impala absolutely kills it. same query 9 seconds. no memory issues. no issues. In fact I am pretty disappointed with Spark-SQL.I have worked with Hive during the 0.9.x stages and taken projects to production successfully and Hive actually very rarely craps out. Whether the spark folks like what I say or not, yes my expectations are pretty high of Spark-SQL if I were to change the ways we are doing things at my workplace.Until that time, we are going to be hugely dependent on Impala and Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow now). I want to clarify for those of u who may be asking - why I am not using spark with Scala and insisting on using spark-sql ? - I have already pipelined data from enterprise tables to Hive- I am using CDH 5.3.3 (Cloudera starving developers version)- I have close to 300 tables defined in Hive external tables. - Data if on HDFS- On an average we have 150 columns per table- One an everyday basis , we do crazy amounts of ad-hoc joining of new and old tables in getting datasets ready for supervised ML- I thought that quite simply I can point Spark to the Hive meta and do queries as I do - in fact the existing queries would work as is unless I am using some esoteric Hive/Impala function Anyway, if there are some settings I can use and get spark-sql to run even on standalone mode that will be huge help. On the pre-production cluster I have spark on YARN but could never get it to run fairly complex queries and I have no answers from this group of the CDH groups. So my assumption is that its possibly not solved , else I have always got very quick answers and responses :-) to my questions on all CDH groups, Spark, Hive best regards sanjay From: Josh Rosen rosenvi...@gmail.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Friday, June 12, 2015 7:15 AM Subject: Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space It sounds like this might be caused by a memory configuration problem. In addition to looking at the executor memory, I'd also bump up the driver memory, since it appears that your shell is running out of memory when collecting a large query result. Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys Using Hive and Impala daily intensively.Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster32GB RAM per node8 cores per node | spark | 1.2.0+cdh5.3.3+371 | I am testing some stuff on one view and getting memory errorsPossibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view aers.aers_demo_view (7 million+ records)===isr bigint case idevent_dt bigint Event dateage double age of patientage_cod string days,months yearssex string M or Fyear intquarter int VIEW DEFINITIONCREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter`FROM `aers`.`aers_demo_v1`UNION ALLSELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter`FROM `aers`.`aers_demo_v2`UNION ALLSELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter`FROM `aers`.`aers_demo_v3`UNION ALLSELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter`FROM `aers`.`aers_demo_v4`UNION ALLSELECT `aers_demo_v5`.`primaryid` AS `ISR`, `aers_demo_v5`.`event_dt`, `aers_demo_v5`.`age`, `aers_demo_v5`.`age_cod`, `aers_demo_v5`.`gndr_cod`, `aers_demo_v5`.`year`, `aers_demo_v5`.`quarter`FROM `aers`.`aers_demo_v5`UNION ALLSELECT `aers_demo_v6`.`primaryid` AS `ISR`, `aers_demo_v6`.`event_dt`, `aers_demo_v6`.`age`, `aers_demo_v6`.`age_cod`,
spark stream twitter question ..
Hi I have a question about Spark Twitter stream processing in Spark 1.3.1, the code sample below just opens up a twitter stream, uses auth keys, splits out has tags and creates a temp table. However, when I try to compile it using sbt ( CentOS 6.5) I get the error [error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] [error] val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF() I know that I need to import sqlContext.implicits._ which is what Ive tried but I still get the error. Can anyone advise ? import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql._ import org.apache.spark.sql.types.{StructType,StructField,StringType} object twitter1 { def main(args: Array[String]) { // create a spark conf and context val appName = Twitter example 1 val conf= new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) // set twitter auth key values val consumerKey = QQxxx val consumerSecret = 0HFxxx val accessToken = 32394xxx val accessTokenSecret = IlQvscxxx // set twitter auth properties // https://apps.twitter.com/ System.setProperty(twitter4j.oauth.consumerKey, consumerKey) System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret) System.setProperty(twitter4j.oauth.accessToken, accessToken) System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret) val ssc= new StreamingContext(sc, Seconds(5) ) val stream = TwitterUtils.createStream(ssc,None) val hashTags = stream.flatMap( status = status.getText.split( ).filter(_.startsWith(#))) // val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ hashTags.foreachRDD{ rdd = val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF() dfHashTags.registerTempTable(tweets) } // extra stuff here ssc.start() ssc.awaitTermination() } // end main } // end twitter1 cheers Mike F
Re: [Spark] What is the most efficient way to do such a join and column manipulation?
Thanks, Don! Does SQL implementation of spark do parallel processing on records by default? -Rex On Sat, Jun 13, 2015 at 10:13 AM, Don Drake dondr...@gmail.com wrote: Take a look at https://github.com/databricks/spark-csv to read in the tab-delimited file (change the default delimiter) and once you have that as a DataFrame, SQL can do the rest. https://spark.apache.org/docs/latest/sql-programming-guide.html -Don On Fri, Jun 12, 2015 at 8:46 PM, Rex X dnsr...@gmail.com wrote: Hi, I want to use spark to select N columns, top M rows of all csv files under a folder. To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 100K rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this? -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ 800-733-2143
Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space
Try using Spark 1.4.0 with SQL code generation turned on; this should make a huge difference. On Sat, Jun 13, 2015 at 5:08 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: hey guys I tried the following settings as well. No luck --total-executor-cores 24 --executor-memory 4G BTW on the same cluster , impala absolutely kills it. same query 9 seconds. no memory issues. no issues. In fact I am pretty disappointed with Spark-SQL. I have worked with Hive during the 0.9.x stages and taken projects to production successfully and Hive actually very rarely craps out. Whether the spark folks like what I say or not, yes my expectations are pretty high of Spark-SQL if I were to change the ways we are doing things at my workplace. Until that time, we are going to be hugely dependent on Impala and Hive(with SSD speeding up the shuffle stage , even MR jobs are not that slow now). I want to clarify for those of u who may be asking - why I am not using spark with Scala and insisting on using spark-sql ? - I have already pipelined data from enterprise tables to Hive - I am using CDH 5.3.3 (Cloudera starving developers version) - I have close to 300 tables defined in Hive external tables. - Data if on HDFS - On an average we have 150 columns per table - One an everyday basis , we do crazy amounts of ad-hoc joining of new and old tables in getting datasets ready for supervised ML - I thought that quite simply I can point Spark to the Hive meta and do queries as I do - in fact the existing queries would work as is unless I am using some esoteric Hive/Impala function Anyway, if there are some settings I can use and get spark-sql to run even on standalone mode that will be huge help. On the pre-production cluster I have spark on YARN but could never get it to run fairly complex queries and I have no answers from this group of the CDH groups. So my assumption is that its possibly not solved , else I have always got very quick answers and responses :-) to my questions on all CDH groups, Spark, Hive best regards sanjay -- *From:* Josh Rosen rosenvi...@gmail.com *To:* Sanjay Subramanian sanjaysubraman...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Friday, June 12, 2015 7:15 AM *Subject:* Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space It sounds like this might be caused by a memory configuration problem. In addition to looking at the executor memory, I'd also bump up the driver memory, since it appears that your shell is running out of memory when collecting a large query result. Sent from my phone On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys Using Hive and Impala daily intensively. Want to transition to spark-sql in CLI mode Currently in my sandbox I am using the Spark (standalone mode) in the CDH distribution (starving developer version 5.3.3) 3 datanode hadoop cluster 32GB RAM per node 8 cores per node spark 1.2.0+cdh5.3.3+371 I am testing some stuff on one view and getting memory errors Possibly reason is default memory per executor showing on 18080 is 512M These options when used to start the spark-sql CLI does not seem to have any effect --total-executor-cores 12 --executor-memory 4G /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e select distinct isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view aers.aers_demo_view (7 million+ records) === isr bigint case id event_dtbigint Event date age double age of patient age_cod string days,months years sex string M or F yearint quarter int VIEW DEFINITION CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, `year` AS `year`, `quarter` AS `quarter` FROM (SELECT `aers_demo_v1`.`isr`, `aers_demo_v1`.`event_dt`, `aers_demo_v1`.`age`, `aers_demo_v1`.`age_cod`, `aers_demo_v1`.`gndr_cod`, `aers_demo_v1`.`year`, `aers_demo_v1`.`quarter` FROM `aers`.`aers_demo_v1` UNION ALL SELECT `aers_demo_v2`.`isr`, `aers_demo_v2`.`event_dt`, `aers_demo_v2`.`age`, `aers_demo_v2`.`age_cod`, `aers_demo_v2`.`gndr_cod`, `aers_demo_v2`.`year`, `aers_demo_v2`.`quarter` FROM `aers`.`aers_demo_v2` UNION ALL SELECT `aers_demo_v3`.`isr`, `aers_demo_v3`.`event_dt`, `aers_demo_v3`.`age`, `aers_demo_v3`.`age_cod`, `aers_demo_v3`.`gndr_cod`, `aers_demo_v3`.`year`, `aers_demo_v3`.`quarter` FROM `aers`.`aers_demo_v3` UNION ALL SELECT `aers_demo_v4`.`isr`, `aers_demo_v4`.`event_dt`, `aers_demo_v4`.`age`, `aers_demo_v4`.`age_cod`, `aers_demo_v4`.`gndr_cod`, `aers_demo_v4`.`year`, `aers_demo_v4`.`quarter` FROM `aers`.`aers_demo_v4` UNION ALL SELECT
Re: Extracting k-means cluster values along with centers?
trying again On 13 Jun 2015, at 10:15, Robin East robin.e...@xense.co.uk wrote: Here’s typical way to do it: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors // Load and parse the data val data = sc.textFile(data/mllib/kmeans_data.txt) val parsedData = data.map(s = Vectors.dense(s.split(' ').map(_.toDouble))).cache() // Cluster the data into two classes using KMeans val numClusters = 2 val numIterations = 20 val model = KMeans.train(parsedData, numClusters, numIterations) val parsedDataClusters = model.predict(parsedData) val dataWithClusters = parsedData.zip(parsedDataClusters) On 12 Jun 2015, at 23:44, Minnow Noir minnown...@gmail.com mailto:minnown...@gmail.com wrote: Greetings. I have been following some of the tutorials online for Spark k-means clustering. I would like to be able to just dump all the cluster values and their centroids to text file so I can explore the data. I have the clusters as such: val clusters = KMeans.train(parsedData, numClusters, numIterations) clusters res2: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@59de440b Is there a way to build something akin to a key value RDD that has the center as the key and the array of values associated with that center as the value? I don't see anything in the tutorials, API docs, or the Learning book for how to do this. Thank you
Re: Building scaladoc using build/sbt unidoc failure
Try build/sbt clean first. On Tue, May 26, 2015 at 4:45 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I am trying to build scala doc from the 1.4 branch. But it failed due to [error] (sql/compile:compile) java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) I followed the instruction on github https://github.com/apache/spark/tree/branch-1.4/docs and used the following command: $ build/sbt unidoc Please see attachment for detailed error. Did I miss anything? Thanks. Justin *unidoc_error.txt* (30K) Download Attachment http://apache-spark-user-list.1001560.n3.nabble.com/attachment/23044/0/unidoc_error.txt -- View this message in context: Building scaladoc using build/sbt unidoc failure http://apache-spark-user-list.1001560.n3.nabble.com/Building-scaladoc-using-build-sbt-unidoc-failure-tp23044.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: --jars not working?
Are you using a build for scala 2.11? I’ve encountered the same behaviour trying to run on Yarn with scala 2.11 and Spark 1.3.0, 1.3.1 and 1.4.0.RC3 and raised JIRA issue here: https://issues.apache.org/jira/browse/SPARK-7944. Would be good to know if this is identical to what you’re seeing on Mesos. Thanks Alex On Fri, Jun 12, 2015 at 8:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can verify if the jars are shipped properly by looking at the driver UI (running on 4040) Environment tab. Thanks Best Regards On Sat, Jun 13, 2015 at 12:43 AM, Jonathan Coveney jcove...@gmail.com wrote: Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos 0.19.0)... Regardless, I'm running into a really weird situation where when I pass --jars to bin/spark-shell I can't reference those classes on the repl. Is this expected? The logs even tell me that my jars have been added, and yet the classes inside of them are not available. Am I missing something obvious?
Re: [Spark 1.4.0] java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation
That's the Tachyon FS there, which appears to be missing a method override. On 12 Jun 2015, at 19:58, Peter Haumer phau...@us.ibm.commailto:phau...@us.ibm.com wrote: Exception in thread main java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:213) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2401) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2411) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:166) at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:653) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:389) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:762) at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:762) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:196) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) A quick look at the tachyon source says it now does https://github.com/amplab/tachyon/blob/8408edd04430b11bf9ccfc1dbe1e8a7e502bb582/clients/unshaded/src/main/java/tachyon/hadoop/TFS.java ..which means you really need a consistent version with the rest of the code, or somehow get TFS out of the pipeline
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Perfect! I'll start working on it 2015-06-13 2:23 GMT+02:00 Amit Ramesh a...@yelp.com: Hi Juan, I have created a ticket for this: https://issues.apache.org/jira/browse/SPARK-8337 Thanks! Amit On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, If you want I would be happy to work in this. I have worked with KafkaUtils.createDirectStream before, in a pull request that wasn't accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python and I'm starting to feel comfortable with Scala, so if someone opens a JIRA I can take it. Greetings, Juan Rodriguez 2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org: The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd look at would be adding that functionality back in. If someone wants help creating a patch for that, just let me know. Dealing with offsets on a per-message basis may not be as efficient as dealing with them on a batch basis using the HasOffsetRanges interface... but if efficiency was a primary concern, you probably wouldn't be using Python anyway. On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com: Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com wrote: OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com: Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com: Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit
Re: [Spark] What is the most efficient way to do such a join and column manipulation?
Take a look at https://github.com/databricks/spark-csv to read in the tab-delimited file (change the default delimiter) and once you have that as a DataFrame, SQL can do the rest. https://spark.apache.org/docs/latest/sql-programming-guide.html -Don On Fri, Jun 12, 2015 at 8:46 PM, Rex X dnsr...@gmail.com wrote: Hi, I want to use spark to select N columns, top M rows of all csv files under a folder. To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 100K rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this? -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ 800-733-2143
What is most efficient to do a large union and remove duplicates?
I have 10 folder, each with 6000 files. Each folder is roughly 500GB. So totally 5TB data. The data is formatted as key t/ value. After union, I want to remove the duplicates among keys. So each key should be unique and has only one value. Here is what I am doing. folders = Array(folder1,folder2folder10 ) var rawData = sc.textFile(folders(0)).map(x = (x.split(\t)(0), x.split(\t)(1))) for (a - 1 to sud_paths.length - 1) { rawData = rawData.union(sc.textFile(folders (a)).map(x = (x.split(\t)(0), x.split(\t)(1 } val nodups = rawData.reduceByKey((a,b)= { if(a.length b.length) {a} else {b} } ) nodups.saveAsTextFile(/nodups) Anything I could do to make this process faster? Right now my process dies when output the data to the HDFS. Thank you ! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-most-efficient-to-do-a-large-union-and-remove-duplicates-tp23303.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dynamic allocator requests -1 executors
Hi Patrick, I'm noticing that you're using Spark 1.3.1. We fixed a bug in dynamic allocation in 1.4 that permitted requesting negative numbers of executors. Any chance you'd be able to try with the newer version and see if the problem persists? -Sandy On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
Re: How to split log data into different files according to severity
I am currently using filter inside a loop of all severity levels to do this, which I think is pretty inefficient. It has to read the entire data set once for each severity. I wonder if there is a more efficient way that takes just one pass of the data? Thanks. Best, Hao Wang On Jun 13, 2015, at 3:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you looking for something like filter? See a similar example here https://spark.apache.org/examples.html https://spark.apache.org/examples.html Thanks Best Regards On Sat, Jun 13, 2015 at 3:11 PM, Hao Wang bill...@gmail.com mailto:bill...@gmail.com wrote: Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
Re: How to split log data into different files according to severity
Check out this recent post by Cheng Liam regarding dynamic partitioning in Spark 1.4: https://www.mail-archive.com/user@spark.apache.org/msg30204.html On June 13, 2015, at 5:41 AM, Hao Wang bill...@gmail.com wrote: Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
How to read avro in SparkR
Hi, I am trying to read a avro file in SparkR (in Spark 1.4.0). I started R using the following. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 Inside the R shell, when I issue the following, read.df(sqlContext, file:///home/matmsh/myfile.avro,avro) I get the following exception. Caused by: java.lang.RuntimeException: Failed to load class for data source: avro Below is the stack trace. matmsh@gauss:~$ sparkR --packages com.databricks:spark-avro_2.10:1.0.0 R version 3.2.0 (2015-04-16) -- Full of IngredientsCopyright (C) 2015 The R Foundation for Statistical ComputingPlatform: x86_64-suse-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY.You are welcome to redistribute it under certain conditions.Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors.Type 'contributors()' for more information and'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or'help.start()' for an HTML browser interface to help.Type 'q()' to quit R. Launching java with spark-submit command /home/matmsh/installed/spark/bin/spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 sparkr-shell /tmp/RtmpoT7FrF/backend_port464e1e2fb16a Ivy Default Cache set to: /home/matmsh/.ivy2/cacheThe jars for the packages stored in: /home/matmsh/.ivy2/jars:: loading settings :: url = jar:file:/home/matmsh/installed/sparks/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlcom.databricks#spark-avro_2.10 added as a dependency:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-avro_2.10;1.0.0 in list found org.apache.avro#avro;1.7.6 in local-m2-cache found org.codehaus.jackson#jackson-core-asl;1.9.13 in list found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in list found com.thoughtworks.paranamer#paranamer;2.3 in list found org.xerial.snappy#snappy-java;1.0.5 in list found org.apache.commons#commons-compress;1.4.1 in list found org.tukaani#xz;1.0 in list found org.slf4j#slf4j-api;1.6.4 in list:: resolution report :: resolve 421ms :: artifacts dl 16ms :: modules in use: com.databricks#spark-avro_2.10;1.0.0 from list in [default] com.thoughtworks.paranamer#paranamer;2.3 from list in [default] org.apache.avro#avro;1.7.6 from local-m2-cache in [default] org.apache.commons#commons-compress;1.4.1 from list in [default] org.codehaus.jackson#jackson-core-asl;1.9.13 from list in [default] org.codehaus.jackson#jackson-mapper-asl;1.9.13 from list in [default] org.slf4j#slf4j-api;1.6.4 from list in [default] org.tukaani#xz;1.0 from list in [default] org.xerial.snappy#snappy-java;1.0.5 from list in [default] - | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 9 | 0 | 0 | 0 || 9 | 0 | -:: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 9 already retrieved (0kB/9ms)15/06/13 17:37:42 INFO spark.SparkContext: Running Spark version 1.4.015/06/13 17:37:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/06/13 17:37:42 WARN util.Utils: Your hostname, gauss resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface enp3s0)15/06/13 17:37:42 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address15/06/13 17:37:42 INFO spark.SecurityManager: Changing view acls to: matmsh15/06/13 17:37:42 INFO spark.SecurityManager: Changing modify acls to: matmsh15/06/13 17:37:42 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(matmsh); users with modify permissions: Set(matmsh)15/06/13 17:37:43 INFO slf4j.Slf4jLogger: Slf4jLogger started15/06/13 17:37:43 INFO Remoting: Starting remoting15/06/13 17:37:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.10:46219]15/06/13 17:37:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 46219.15/06/13 17:37:43 INFO spark.SparkEnv: Registering MapOutputTracker15/06/13 17:37:43 INFO spark.SparkEnv: Registering BlockManagerMaster15/06/13 17:37:43 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-c8661016-d922-4ad3-a171-7b0f719c40a2/blockmgr-e79853e5-e046-4b13-a3ba-0b4c4683146115/06/13 17:37:43 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB15/06/13 17:37:43 INFO spark.HttpFileServer: HTTP File server directory is
How to silence Parquet logging?
Hey everyone, I’m trying to figure out how to silence all of the logging info that gets printed to the console when dealing with Parquet files. I’ve seen that there have been several PRs addressing this issue, but I can’t seem to figure out how to actually change the logging config. I’ve already messed with the log4j.properties /conf, like so: log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.spark-project.jetty=ERROR log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR This does, in fact, silence the logging for everything else, but the Parquet config seems totally unchanged. Does anyone know how to do this? Thanks! -Chris Freeman
--packages Failed to load class for data source v1.4
I downloaded the pre-compiled Spark 1.4.0 and attempted to run an existing Python Spark application against it and got the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. : java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv I pass the following on the command-line to my spark-submit: --packages com.databricks:spark-csv_2.10:1.0.3 This worked fine on 1.3.1, but not in 1.4. I was able to replicate it with the following pyspark: a = {'a':1.0, 'b':'asdf'} rdd = sc.parallelize([a]) df = sqlContext.createDataFrame(rdd) df.save(/tmp/d.csv, com.databricks.spark.csv) Even using the new df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the same error. I see it was added in the web UI: file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded By Userfile:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jarAdded By Userhttp://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded By Userhttp://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jarAdded By User Thoughts? -Don Gory details: $ pyspark --packages com.databricks:spark-csv_2.10:1.0.3 Python 2.7.6 (default, Sep 9 2014, 15:04:36) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin Type help, copyright, credits or license for more information. Ivy Default Cache set to: /Users/drake/.ivy2/cache The jars for the packages stored in: /Users/drake/.ivy2/jars :: loading settings :: url = jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-csv_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-csv_2.10;1.0.3 in central found org.apache.commons#commons-csv;1.1 in central :: resolution report :: resolve 590ms :: artifacts dl 17ms :: modules in use: com.databricks#spark-csv_2.10;1.0.3 from central in [default] org.apache.commons#commons-csv;1.1 from central in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/15ms) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0 2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info from SCDynamicStore 15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on interface en0) 15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake 15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake 15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(drake); users with modify permissions: Set(drake) 15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started 15/06/13 11:06:10 INFO Remoting: Starting remoting 15/06/13 11:06:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.222:56870] 15/06/13 11:06:10 INFO Utils: Successfully started service 'sparkDriver' on port 56870. 15/06/13 11:06:10 INFO SparkEnv: Registering MapOutputTracker 15/06/13 11:06:10 INFO SparkEnv: Registering BlockManagerMaster 15/06/13 11:06:10 INFO DiskBlockManager: Created local directory at /private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/blockmgr-a1412b71-fe56-429c-a193-ce3fb95d2ffd 15/06/13 11:06:10 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/06/13 11:06:10 INFO HttpFileServer: HTTP File server directory is /private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/httpd-84d178da-7e60-4eed-8031-e6a0c465bd4c 15/06/13 11:06:10 INFO HttpServer: Starting HTTP Server 15/06/13 11:06:10 INFO Utils: Successfully started service 'HTTP file server' on port 56871. 15/06/13 11:06:10 INFO SparkEnv: Registering OutputCommitCoordinator 15/06/13 11:06:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 15/06/13 11:06:11 INFO Utils: Successfully started service 'SparkUI' on port 4041. 15/06/13 11:06:11 INFO SparkUI: Started SparkUI at
Not albe to run FP-growth Example
Hi every one, I am trying to run the FP growth example. I have tried to compile the following POM file: project groupIdcom.oreilly.learningsparkexamples.mini/groupId artifactIdlearning-spark-mini-example/artifactId modelVersion4.0.0/modelVersion nameexample/name packagingjar/packaging version0.0.1/version dependencies dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.0/version scopeprovided/scope /dependency /dependencies properties java.version1.7/java.version /properties build pluginManagement plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version3.1/version configuration source${java.version}/source target${java.version}/target /configuration /plugin /plugins /pluginManagement /build /project It successfully builds the project, but IDE is complaining that: Error:(29, 34) java: package org.apache.spark.mllib.fpm does not exist Just as a side note, I downloaded Version 1.3 of Spark so FP-growth algorithm should be part of it? Thanks.
Reliable SQS Receiver for Spark Streaming
I would like to have a Spark Streaming *SQS Receiver* which deletes SQS messages only *after* they were successfully stored on S3. For this a *Custom Receiver* can be implemented with the semantics of the Reliable Receiver. The store(multiple-records) call blocks until the given records have been stored and replicated inside Spark https://spark.apache.org/docs/latest/streaming-custom-receivers.html#receiver-reliability . If the *write-ahead logs* are enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing . The checkpoint directory can be pointed to S3. After the store(multiple-records) blocking call finishes, are the records already stored in the checkpoint directory (and thus can be safely deleted from SQS)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reliable-SQS-Receiver-for-Spark-Streaming-tp23302.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reliable SQS Receiver for Spark Streaming
Yes, if you have enabled WAL and checkpointing then after the store, you can simply delete the SQS Messages from your receiver. Thanks Best Regards On Sat, Jun 13, 2015 at 6:14 AM, Michal Čizmazia mici...@gmail.com wrote: I would like to have a Spark Streaming SQS Receiver which deletes SQS messages only after they were successfully stored on S3. For this a Custom Receiver can be implemented with the semantics of the Reliable Receiver. The store(multiple-records) call blocks until the given records have been stored and replicated inside Spark. If the write-ahead logs are enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. The checkpoint directory can be pointed to S3. After the store(multiple-records) blocking call finishes, are the records already stored in the checkpoint directory (and thus can be safely deleted from SQS)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not albe to run FP-growth Example
Thanks for the answer. Any example? On Jun 13, 2015 2:13 PM, Sonal Goyal sonalgoy...@gmail.com wrote: I think you need to add dependency to spark mllib too. On Jun 13, 2015 11:10 AM, masoom alam masoom.a...@wanclouds.net wrote: Hi every one, I am trying to run the FP growth example. I have tried to compile the following POM file: project groupIdcom.oreilly.learningsparkexamples.mini/groupId artifactIdlearning-spark-mini-example/artifactId modelVersion4.0.0/modelVersion nameexample/name packagingjar/packaging version0.0.1/version dependencies dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.0/version scopeprovided/scope /dependency /dependencies properties java.version1.7/java.version /properties build pluginManagement plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version3.1/version configuration source${java.version}/source target${java.version}/target /configuration /plugin /plugins /pluginManagement /build /project It successfully builds the project, but IDE is complaining that: Error:(29, 34) java: package org.apache.spark.mllib.fpm does not exist Just as a side note, I downloaded Version 1.3 of Spark so FP-growth algorithm should be part of it? Thanks.
How to split log data into different files according to severity
Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
Re: How to split log data into different files according to severity
Are you looking for something like filter? See a similar example here https://spark.apache.org/examples.html Thanks Best Regards On Sat, Jun 13, 2015 at 3:11 PM, Hao Wang bill...@gmail.com wrote: Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
Re: Are there ways to restrict what parameters users can set for a Spark job?
I think the straight answer would be No, but yes you can actually hardcode these parameters if you want. Look in the SparkContext.scala https://github.com/apache/spark/blob/master/core%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2FSparkContext.scala#L364 where all these properties are being initialized, if you hard code it there and rebuild your spark then it doesn't matter what the user set. Thanks Best Regards On Sat, Jun 13, 2015 at 10:56 AM, YaoPau jonrgr...@gmail.com wrote: For example, Hive lets you set a whole bunch of parameters (# of reducers, # of mappers, size of reducers, cache size, max memory to use for a join), while Impala gives users a much smaller subset of parameters to work with, which makes it nice to give to a BI team. Is there a way to restrict which parameters a user can set for a Spark job? Maybe to cap the # of executors, or cap the memory for each executor, or to enforce a default setting no matter what parameters are used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Are-there-ways-to-restrict-what-parameters-users-can-set-for-a-Spark-job-tp23301.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4 release date
Does the pre-build come with hive support? Namely, has it been built with -Phive and -Phive-thriftserver? On Fri, Jun 12, 2015, 9:32 AM ayan guha guha.a...@gmail.com wrote: Thanks guys, my question must look like a stupid one today :) Looking forward to test out 1.4.0, just downloaded it. Congrats to the team for this much anticipate release. On Fri, Jun 12, 2015 at 10:12 PM, Guru Medasani gdm...@gmail.com wrote: Here is a spark 1.4 release blog by data bricks. https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html Guru Medasani gdm...@gmail.com On Jun 12, 2015, at 7:08 AM, ayan guha guha.a...@gmail.com wrote: Thanks a lot. On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com wrote: It was released yesterday. On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote: Hi When is official spark 1.4 release date? Best Ayan -- Best Regards, Ayan Guha
Re: Reliable SQS Receiver for Spark Streaming
Thanks Akhil! I just looked it up in the code as well. Receiver.store(ArrayBuffer[T], ...) ReceiverSupervisorImpl.pushArrayBuffer(ArrayBuffer[T], ...) ReceiverSupervisorImpl.pushAndReportBlock(...) WriteAheadLogBasedBlockHandler.storeBlock(...) This implementation stores the block into the block manager as well as a write ahead log. It does this in parallel, using Scala Futures, and returns only after the block has been stored in both places. https://www.codatlas.com/github.com/apache/spark/master/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala?keyword=WriteAheadLogBasedBlockHandlerline=160 On 13 June 2015 at 06:46, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, if you have enabled WAL and checkpointing then after the store, you can simply delete the SQS Messages from your receiver. Thanks Best Regards On Sat, Jun 13, 2015 at 6:14 AM, Michal Čizmazia mici...@gmail.com wrote: I would like to have a Spark Streaming SQS Receiver which deletes SQS messages only after they were successfully stored on S3. For this a Custom Receiver can be implemented with the semantics of the Reliable Receiver. The store(multiple-records) call blocks until the given records have been stored and replicated inside Spark. If the write-ahead logs are enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. The checkpoint directory can be pointed to S3. After the store(multiple-records) blocking call finishes, are the records already stored in the checkpoint directory (and thus can be safely deleted from SQS)? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dataframe Write : Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
Hi, I am using spark 0.14. I try to insert data into a hive table (in orc format) from DF. partitionedTestDF.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(testorc) When this job is submitted by spark-submit I get Exception in thread main java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead But the job works fine on spark-shell. What can be wrong? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dynamic allocator requests -1 executors
Hey Sandy, I'll test it out on 1.4. Do you have a bug number or PR that I could reference as well? Thanks! -Pat Sent from my iPhone On Jun 13, 2015, at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Patrick, I'm noticing that you're using Spark 1.3.1. We fixed a bug in dynamic allocation in 1.4 that permitted requesting negative numbers of executors. Any chance you'd be able to try with the newer version and see if the problem persists? -Sandy On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]