Re: Cache sparkSql data without uncompressing it in memory
Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote: Currently there’s no way to cache the compressed sequence file directly. Spark SQL uses in-memory columnar format while caching table rows, so we must read all the raw data and convert them into columnar format. However, you can enable in-memory columnar compression by setting spark.sql.inMemoryColumnarStorage.compressed to true. This property is already set to true by default in master branch and branch-1.2. On 11/13/14 7:16 AM, Sadhan Sood wrote: We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Too many failed collects when trying to cache a table in SparkSQL
We are running spark on yarn with combined memory 1TB and when trying to cache a table partition(which is 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Too many failed collects when trying to cache a table in SparkSQL
at DAGScheduler.scala:838 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 66.475 s 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:11:15,482 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at DAGScheduler.scala:838 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 372 on ip-10-95-163-84.ec2.internal: remote Akka client disassociated 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 372 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3) On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood sadhan.s...@gmail.com wrote: We are running spark on yarn with combined memory 1TB and when trying to cache a table partition(which is 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195
Re: Too many failed collects when trying to cache a table in SparkSQL
On re running the cache statement, from the logs I see that when collect(stage 1) fails it always leads to mapPartition(stage 0) for one partition to be re-run. This can be seen from the collect log as well on the container log: rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 The data is lzo compressed sequence file with compressed size ~ 26G. Is there a way to understand why shuffle keeps failing for one partition. I believe we have enough memory to store the uncompressed data in memory. On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote: This is the log output: 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS SELECT * FROM xyz where date_prefix = 20141112' 2014-11-12 19:07:17,455 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 2014-11-12 19:07:17,756 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at TableReader.scala:68 2014-11-12 19:07:18,292 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at Exchange.scala:86) 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) with 1 output partitions (allowLocal=false) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at SparkPlan.scala:84) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86), which has no missing parents 2014-11-12 19:07:22,916 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at DAGScheduler.scala:838 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 161.113 s 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:10:04,112 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at DAGScheduler.scala:838 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 52 on ip-10-61-175-167.ec2.internal: remote Akka client disassociated 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) - Stage 0 is now unavailable on executor 52 (460/461, false) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at SparkPlan.scala:84) as failed due to a fetch failure from Stage 0 (mapPartitions at Exchange.scala:86) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59
Cache sparkSql data without uncompressing it in memory
We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: thrift jdbc server probably running queries as hive query
Hi Cheng, I made sure the only hive server running on the machine is hivethriftserver2. /usr/lib/jvm/default-java/bin/java -cp /usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn --jars reporting.jar spark-internal The query I am running is a simple count(*): select count(*) from Xyz where date_prefix=20141031 and pretty sure it's submitting a map reduce job based on the spark logs: TakesRest=false Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=number In order to limit the maximum number of reducers: set hive.exec.reducers.max=number In order to set a constant number of reducers: set mapreduce.job.reduces=number 14/11/11 16:23:17 INFO ql.Context: New scratch dir is hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2 Starting Job = job_1414084656759_0142, Tracking URL = http://xxx:8100/proxy/application_1414084656759_0142/ http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2Fsi=6222577584832512pi=626685a9-b628-43cc-91a1-93636171ce77 Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1414084656759_0142 On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Sadhan, I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't even provide a Hive mode to let you execute queries against Hive. Would you please check whether there is an existing HiveServer2 running there? Spark SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share the same default listening port. I guess the Thrift server didn't start successfully because the HiveServer2 occupied the port, and your Beeline session was probably linked against HiveServer2. Cheng On 11/11/14 8:29 AM, Sadhan Sood wrote: I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167
Partition caching taking too long
While testing SparkSQL on top of our Hive metastore, we were trying to cache the data for one partition of the table in memory like this: CACHE TABLE xyz_20141029 AS SELECT * FROM xyz where date_prefix = 20141029 Table xyz is a hive table which is partitioned with date_prefix. The data is date_prefix = 20141029 directory is one parquet file: hdfs dfs -ls /event_logs/xyz/20141029 Found 1 items -rw-r--r-- 3 ubuntu hadoop 854521061 2014-11-11 22:20 /event_logs/xyz/20141029/part-01493178cd7f2-31eb-3f9d-b004-149a97ac4d79-r-01493.lzo.parquet The file size is no more than 800 MB but still the cache command is taking longer than an hour and is reading data multiple Gigs from what seems like from the UI with multiple failures. Stage 0(mapPartition) which took longest was running as (from UI): 0 mapPartitions at Exchange.scala:86 +details RDD: HiveTableScan [tid#46,compact#47,date_prefix#45], (MetastoreRelation default, bid, None), Some((CAST(date_prefix#45, DoubleType) = 2.0141029E7)) org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:602) org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:86) org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45) org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:128) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:423) org.apache.spark.sql.SchemaRDD.count(SchemaRDD.scala:343) org.apache.spark.sql.execution.CacheTableCommand.sideEffectResult$lzycompute(commands.scala:168) org.apache.spark.sql.execution.CacheTableCommand.sideEffectResult(commands.scala:159) org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) org.apache.spark.sql.execution.CacheTableCommand.execute(commands.scala:153) org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:105) 2014/11/11 22:28:47 40 min(Duration) 19546/19546(Tasks: Succeeded/Total) 201.1 GB(input) 973.5 KB(Shuffle Write) I need help understanding what is going on and how we can optimize the caching. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
getting exception when trying to build spark from master
Getting an exception while trying to build spark in spark-core: [ERROR] while compiling: /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -deprecation -feature -classpath last tree to typer: Ident(enumDispatcher) symbol: value enumDispatcher (flags: triedcooking) symbol definition: val enumDispatcher: java.util.EnumSet[javax.servlet.DispatcherType] tpe: java.util.EnumSet[javax.servlet.DispatcherType] symbol owners: value enumDispatcher - value $anonfun - method addFilters - object JettyUtils - package ui context owners: value $anonfun - value $anonfun - method addFilters - object JettyUtils - package ui == Enclosing template or block == Block( ValDef( // val filters: Array[String] triedcooking filters AppliedTypeTree( Array String ) Apply( conf.get(spark.ui.filters, ).split(',').map Function( // val $anonfun: notype, tree.tpe=String = String ValDef( // x$1: String param synthetic triedcooking x$1 tpt // tree.tpe=String empty ) Apply( // def trim(): String in class String, tree.tpe=String x$1.trim // def trim(): String in class String, tree.tpe=()String Nil ) ) ) ) Apply( filters.foreach Match( empty CaseDef( Bind( // val filter: String filter Typed( _ // tree.tpe=String String ) ) If( filter.isEmpty.unary_$bang Block( // 7 statements Apply( logInfo Apply( // final def +(x$1: Any): String in class String, tree.tpe=String Adding filter: .$plus // final def +(x$1: Any): String in class String, tree.tpe=(x$1: Any)String filter // val filter: String, tree.tpe=String ) ) ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder triedcooking holder FilterHolder Apply( new FilterHolder.init Nil ) ) Apply( // def setClassName(x$1: String): Unit in class Holder, tree.tpe=Unit holder.setClassName // def setClassName(x$1: String): Unit in class Holder, tree.tpe=(x$1: String)Unit filter // val filter: String, tree.tpe=String ) Apply( conf.get(spark..+(filter).+(.params), ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach Function( // val $anonfun: notype ValDef( // param: String param triedcooking param String empty ) If( param.isEmpty.unary_$bang Block( ValDef( // val parts: Array[String] triedcooking parts tpt // tree.tpe=Array[String] Apply( // def split(x$1: String): Array[String] in class String, tree.tpe=Array[String] param.split // def split(x$1: String): Array[String] in class String, tree.tpe=(x$1: String)Array[String] = ) ) If( Apply( // def ==(x: Int): Boolean in class Int, tree.tpe=Boolean parts.length.$eq$eq // def ==(x: Int): Boolean in class Int, tree.tpe=(x: Int)Boolean 2 ) Apply( // def setInitParameter(x$1: String,x$2: String): Unit in class Holder holder.setInitParameter // def setInitParameter(x$1: String,x$2: String): Unit in class Holder, tree.tpe=(x$1: String, x$2: String)Unit // 2 arguments Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type 0 ) Apply( // val parts: Array[String] parts // val parts: Array[String], tree.tpe=parts.type 1 ) ) () ) ) () ) ) ) ValDef( // val prefix: String triedcooking prefix tpt // tree.tpe=String Apply( StringContext(spark., .param.).s
Re: getting exception when trying to build spark from master
I reverted the patch locally, seems to be working for me. On Mon, Nov 10, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com wrote: I reverted that patch to see if it fixes it. On Mon, Nov 10, 2014 at 1:45 PM, Josh Rosen rosenvi...@gmail.com wrote: It looks like the Jenkins maven builds are broken, too. Based on the Jenkins logs, I think that this pull request may have broken things (although I'm not sure why): https://github.com/apache/spark/pull/3030#issuecomment-62436181 On Mon, Nov 10, 2014 at 1:42 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Getting an exception while trying to build spark in spark-core: [ERROR] while compiling: /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -deprecation -feature -classpath last tree to typer: Ident(enumDispatcher) symbol: value enumDispatcher (flags: triedcooking) symbol definition: val enumDispatcher: java.util.EnumSet[javax.servlet.DispatcherType] tpe: java.util.EnumSet[javax.servlet.DispatcherType] symbol owners: value enumDispatcher - value $anonfun - method addFilters - object JettyUtils - package ui context owners: value $anonfun - value $anonfun - method addFilters - object JettyUtils - package ui == Enclosing template or block == Block( ValDef( // val filters: Array[String] triedcooking filters AppliedTypeTree( Array String ) Apply( conf.get(spark.ui.filters, ).split(',').map Function( // val $anonfun: notype, tree.tpe=String = String ValDef( // x$1: String param synthetic triedcooking x$1 tpt // tree.tpe=String empty ) Apply( // def trim(): String in class String, tree.tpe=String x$1.trim // def trim(): String in class String, tree.tpe=()String Nil ) ) ) ) Apply( filters.foreach Match( empty CaseDef( Bind( // val filter: String filter Typed( _ // tree.tpe=String String ) ) If( filter.isEmpty.unary_$bang Block( // 7 statements Apply( logInfo Apply( // final def +(x$1: Any): String in class String, tree.tpe=String Adding filter: .$plus // final def +(x$1: Any): String in class String, tree.tpe=(x$1: Any)String filter // val filter: String, tree.tpe=String ) ) ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder triedcooking holder FilterHolder Apply( new FilterHolder.init Nil ) ) Apply( // def setClassName(x$1: String): Unit in class Holder, tree.tpe=Unit holder.setClassName // def setClassName(x$1: String): Unit in class Holder, tree.tpe=(x$1: String)Unit filter // val filter: String, tree.tpe=String ) Apply( conf.get(spark..+(filter).+(.params), ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach Function( // val $anonfun: notype ValDef( // param: String param triedcooking param String empty ) If( param.isEmpty.unary_$bang Block( ValDef( // val parts: Array[String] triedcooking parts tpt // tree.tpe=Array[String] Apply( // def split(x$1: String): Array[String] in class String, tree.tpe=Array[String] param.split // def split(x$1: String): Array[String] in class String, tree.tpe=(x$1: String)Array[String] = ) ) If( Apply( // def ==(x: Int): Boolean in class Int, tree.tpe=Boolean parts.length.$eq$eq // def ==(x: Int): Boolean in class Int, tree.tpe=(x: Int)Boolean 2 ) Apply( // def setInitParameter(x$1: String,x$2: String): Unit in class Holder holder.setInitParameter // def
thrift jdbc server probably running queries as hive query
I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167)) - Submitted application application_1414084656759_0115 It seems like the query is being run as a hive query instead of spark query. The same query works fine when run from spark-sql cli.
Sharing spark context across multiple spark sql cli initializations
We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This would be non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. Imagine each user trying to cache a table in memory when there is limited memory across the cluster. The right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is my understanding correct about resource usage on yarn for spark-sql? Is there a way to do the sharing of spark context currently ? Seem like it needs some kind of thrift interface hooked into the cli driver. *Apologies if you have already seen this on user group*
Fwd: Sharing spark context across multiple spark sql cli initializations
We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This looks non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. So, the right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is the understanding correct ? Is there a way to do it currently ? Seem like it needs some kind of thrift interface hooked into the cli driver.