Re: Cache sparkSql data without uncompressing it in memory

2014-11-13 Thread Sadhan Sood
Thanks Chneg, Just one more question - does that mean that we still need
enough memory in the cluster to uncompress the data before it can be
compressed again or does that just read the raw data as is?

On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote:

  Currently there’s no way to cache the compressed sequence file directly.
 Spark SQL uses in-memory columnar format while caching table rows, so we
 must read all the raw data and convert them into columnar format. However,
 you can enable in-memory columnar compression by setting
 spark.sql.inMemoryColumnarStorage.compressed to true. This property is
 already set to true by default in master branch and branch-1.2.

 On 11/13/14 7:16 AM, Sadhan Sood wrote:

   We noticed while caching data from our hive tables which contain data
 in compressed sequence file format that it gets uncompressed in memory when
 getting cached. Is there a way to turn this off and cache the compressed
 data as is ?

   ​



Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
We are running spark on yarn with combined memory  1TB and when trying to
cache a table partition(which is  100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!

The stack trace of the failed collect from UI is:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
 at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood sadhan.s...@gmail.com wrote:

 We are running spark on yarn with combined memory  1TB and when trying to
 cache a table partition(which is  100G), seeing a lot of failed collect
 stages in the UI and this never succeeds. Because of the failed collect, it
 seems like the mapPartitions keep getting resubmitted. We have more than
 enough memory so its surprising we are seeing this issue. Can someone
 please help. Thanks!

 The stack trace of the failed collect from UI is:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 0
   at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
   at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
   at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
   at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195

Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote:

 This is the log output:

 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
 (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
 SELECT * FROM xyz where date_prefix = 20141112'

 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
 (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
 deprecated. Instead, use mapreduce.job.maps

 2014-11-12 19:07:17,756 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
 TableReader.scala:68

 2014-11-12 19:07:18,292 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
 (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
 Exchange.scala:86)

 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
 with 1 output partitions (allowLocal=false)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
 SparkPlan.scala:84)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
 mapPartitions at Exchange.scala:86), which has no missing parents

 2014-11-12 19:07:22,916 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
 finished in 161.113 s

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - looking for newly runnable stages

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - running: Set()

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - failed: Set()

 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
 SparkPlan.scala:84), which is now runnable

 2014-11-12 19:10:04,112 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
 (MappedRDD[16] at map at SparkPlan.scala:84)

 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
 (Logging.scala:logError(75)) - Lost executor 52 on
 ip-10-61-175-167.ec2.internal: remote Akka client disassociated

 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
 (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
 [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
 (Logging.scala:logError(75)) - Asked to remove non-existent executor 52

 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
 - Stage 0 is now unavailable on executor 52 (460/461, false)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
 SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
 (mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59

Cache sparkSql data without uncompressing it in memory

2014-11-12 Thread Sadhan Sood
We noticed while caching data from our hive tables which contain data in
compressed sequence file format that it gets uncompressed in memory when
getting cached. Is there a way to turn this off and cache the compressed
data as is ?


Re: thrift jdbc server probably running queries as hive query

2014-11-11 Thread Sadhan Sood
Hi Cheng,

I made sure the only hive server running on the machine is
hivethriftserver2.

/usr/lib/jvm/default-java/bin/java -cp
/usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn
--jars reporting.jar spark-internal

The query I am running is a simple count(*): select count(*) from Xyz
where date_prefix=20141031 and pretty sure it's submitting a map reduce
job based on the spark logs:

TakesRest=false

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=number

In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=number

In order to set a constant number of reducers:

  set mapreduce.job.reduces=number

14/11/11 16:23:17 INFO ql.Context: New scratch dir is
hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2

Starting Job = job_1414084656759_0142, Tracking URL =
http://xxx:8100/proxy/application_1414084656759_0142/
http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2Fsi=6222577584832512pi=626685a9-b628-43cc-91a1-93636171ce77

Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1414084656759_0142

On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Sadhan,

 I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't
 even provide a Hive mode to let you execute queries against Hive. Would you
 please check whether there is an existing HiveServer2 running there? Spark
 SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share
 the same default listening port. I guess the Thrift server didn't start
 successfully because the HiveServer2 occupied the port, and your Beeline
 session was probably linked against HiveServer2.

 Cheng


 On 11/11/14 8:29 AM, Sadhan Sood wrote:

 I was testing out the spark thrift jdbc server by running a simple query
 in the beeline client. The spark itself is running on a yarn cluster.

 However, when I run a query in beeline - I see no running jobs in the
 spark UI(completely empty) and the yarn UI seem to indicate that the
 submitted query is being run as a map reduce job. This is probably also
 being indicated from the spark logs but I am not completely sure:

  2014-11-11 00:19:00,492 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1

 2014-11-11 00:19:00,877 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

 2014-11-11 00:19:04,152 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

 2014-11-11 00:19:04,425 INFO  Configuration.deprecation
 (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
 is deprecated. Instead, use mapreduce.client.submit.file.replication

 2014-11-11 00:19:04,516 INFO  client.RMProxy
 (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
 at :8032

 2014-11-11 00:19:04,607 INFO  client.RMProxy
 (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
 at :8032

 2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
 (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
 parsing not performed. Implement the Tool interface and execute your
 application with ToolRunner to remedy this

 2014-11-11 00:00:08,806 INFO  input.FileInputFormat
 (FileInputFormat.java:listStatus(287)) - Total input paths to process :
 14912

 2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
 (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library

 2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:clinit(76)) -
 Successfully loaded  initialized native-lzo library [hadoop-lzo rev
 8e266e052e423af592871e2dfe09d54c03f6a0e8]

 2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
 (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
 allocation with : CompletedNodes: 1, size left: 194541317

 2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
 (JobSubmitter.java:submitJobInternal(396)) - number of splits:615

 2014-11-11 00:00:10,095 INFO  mapreduce.JobSubmitter
 (JobSubmitter.java:printTokens(479)) - Submitting tokens for job:
 job_1414084656759_0115

 2014-11-11 00:00:10,241 INFO  impl.YarnClientImpl
 (YarnClientImpl.java:submitApplication(167

Partition caching taking too long

2014-11-11 Thread Sadhan Sood
While testing SparkSQL on top of our Hive metastore, we were trying to
cache the data for one partition of the table in memory like this:

CACHE TABLE xyz_20141029 AS SELECT * FROM xyz where date_prefix = 20141029

Table xyz is a hive table which is partitioned with date_prefix. The data
is date_prefix = 20141029 directory is one parquet file:

hdfs dfs -ls /event_logs/xyz/20141029

Found 1 items

-rw-r--r--   3 ubuntu hadoop  854521061 2014-11-11 22:20
/event_logs/xyz/20141029/part-01493178cd7f2-31eb-3f9d-b004-149a97ac4d79-r-01493.lzo.parquet

The file size is no more than 800 MB but still the cache command is taking
longer than an hour and is reading data  multiple Gigs from what seems
like from the UI with multiple failures.

Stage 0(mapPartition) which took longest was running as (from UI):

0

mapPartitions at Exchange.scala:86 +details

RDD: HiveTableScan [tid#46,compact#47,date_prefix#45], (MetastoreRelation
default, bid, None), Some((CAST(date_prefix#45, DoubleType) = 2.0141029E7))

org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:602)

org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:86)

org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)

org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:128)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127)

org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126)

org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)

org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:423)

org.apache.spark.sql.SchemaRDD.count(SchemaRDD.scala:343)

org.apache.spark.sql.execution.CacheTableCommand.sideEffectResult$lzycompute(commands.scala:168)

org.apache.spark.sql.execution.CacheTableCommand.sideEffectResult(commands.scala:159)

org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)

org.apache.spark.sql.execution.CacheTableCommand.execute(commands.scala:153)

org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:105)

2014/11/11 22:28:47 40 min(Duration)


19546/19546(Tasks: Succeeded/Total) 201.1 GB(input) 973.5 KB(Shuffle Write)


I need help understanding what is going on and how we can optimize the
caching.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

getting exception when trying to build spark from master

2014-11-10 Thread Sadhan Sood
Getting an exception while trying to build spark in spark-core:

[ERROR]

 while compiling:
/Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

during phase: typer

 library version: version 2.10.4

compiler version: version 2.10.4

  reconstructed args: -deprecation -feature -classpath


  last tree to typer: Ident(enumDispatcher)

  symbol: value enumDispatcher (flags: triedcooking)

   symbol definition: val enumDispatcher:
java.util.EnumSet[javax.servlet.DispatcherType]

 tpe: java.util.EnumSet[javax.servlet.DispatcherType]

   symbol owners: value enumDispatcher - value $anonfun - method
addFilters - object JettyUtils - package ui

  context owners: value $anonfun - value $anonfun - method addFilters
- object JettyUtils - package ui


== Enclosing template or block ==


Block(

  ValDef( // val filters: Array[String]

triedcooking

filters

AppliedTypeTree(

  Array

  String

)

Apply(

  conf.get(spark.ui.filters, ).split(',').map

  Function( // val $anonfun: notype, tree.tpe=String = String

ValDef( // x$1: String

  param synthetic triedcooking

  x$1

  tpt // tree.tpe=String

  empty

)

Apply( // def trim(): String in class String, tree.tpe=String

  x$1.trim // def trim(): String in class String,
tree.tpe=()String

  Nil

)

  )

)

  )

  Apply(

filters.foreach

Match(

  empty

  CaseDef(

Bind( // val filter: String

  filter

  Typed(

_ // tree.tpe=String

String

  )

)

If(

  filter.isEmpty.unary_$bang

  Block(

// 7 statements

Apply(

  logInfo

  Apply( // final def +(x$1: Any): String in class String,
tree.tpe=String

Adding filter: .$plus // final def +(x$1: Any): String
in class String, tree.tpe=(x$1: Any)String

filter // val filter: String, tree.tpe=String

  )

)

ValDef( // val holder: org.eclipse.jetty.servlet.FilterHolder

  triedcooking

  holder

  FilterHolder

  Apply(

new FilterHolder.init

Nil

  )

)

Apply( // def setClassName(x$1: String): Unit in class Holder,
tree.tpe=Unit

  holder.setClassName // def setClassName(x$1: String):
Unit in class Holder, tree.tpe=(x$1: String)Unit

  filter // val filter: String, tree.tpe=String

)

Apply(

  conf.get(spark..+(filter).+(.params),
).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach

  Function( // val $anonfun: notype

ValDef( // param: String

  param triedcooking

  param

  String

  empty

)

If(

  param.isEmpty.unary_$bang

  Block(

ValDef( // val parts: Array[String]

  triedcooking

  parts

  tpt // tree.tpe=Array[String]

  Apply( // def split(x$1: String): Array[String] in
class String, tree.tpe=Array[String]

param.split // def split(x$1: String):
Array[String] in class String, tree.tpe=(x$1: String)Array[String]

=

  )

)

If(

  Apply( // def ==(x: Int): Boolean in class Int,
tree.tpe=Boolean

parts.length.$eq$eq // def ==(x: Int):
Boolean in class Int, tree.tpe=(x: Int)Boolean

2

  )

  Apply( // def setInitParameter(x$1: String,x$2:
String): Unit in class Holder

holder.setInitParameter // def
setInitParameter(x$1: String,x$2: String): Unit in class Holder,
tree.tpe=(x$1: String, x$2: String)Unit

// 2 arguments

Apply( // val parts: Array[String]

  parts // val parts: Array[String],
tree.tpe=parts.type

  0

)

Apply( // val parts: Array[String]

  parts // val parts: Array[String],
tree.tpe=parts.type

  1

)

  )

  ()

)

  )

  ()

)

  )

)

ValDef( // val prefix: String

  triedcooking

  prefix

  tpt // tree.tpe=String

  Apply(

StringContext(spark., .param.).s

  

Re: getting exception when trying to build spark from master

2014-11-10 Thread Sadhan Sood
I reverted the patch locally, seems to be working for me.

On Mon, Nov 10, 2014 at 6:00 PM, Patrick Wendell pwend...@gmail.com wrote:

 I reverted that patch to see if it fixes it.

 On Mon, Nov 10, 2014 at 1:45 PM, Josh Rosen rosenvi...@gmail.com wrote:
  It looks like the Jenkins maven builds are broken, too.  Based on the
  Jenkins logs, I think that this pull request may have broken things
  (although I'm not sure why):
 
  https://github.com/apache/spark/pull/3030#issuecomment-62436181
 
  On Mon, Nov 10, 2014 at 1:42 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:
 
  Getting an exception while trying to build spark in spark-core:
 
  [ERROR]
 
   while compiling:
 
 
 /Users/dev/tellapart_spark/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
 
  during phase: typer
 
   library version: version 2.10.4
 
  compiler version: version 2.10.4
 
reconstructed args: -deprecation -feature -classpath
 
 
last tree to typer: Ident(enumDispatcher)
 
symbol: value enumDispatcher (flags: triedcooking)
 
 symbol definition: val enumDispatcher:
  java.util.EnumSet[javax.servlet.DispatcherType]
 
   tpe: java.util.EnumSet[javax.servlet.DispatcherType]
 
 symbol owners: value enumDispatcher - value $anonfun - method
  addFilters - object JettyUtils - package ui
 
context owners: value $anonfun - value $anonfun - method
 addFilters
  - object JettyUtils - package ui
 
 
  == Enclosing template or block ==
 
 
  Block(
 
ValDef( // val filters: Array[String]
 
  triedcooking
 
  filters
 
  AppliedTypeTree(
 
Array
 
String
 
  )
 
  Apply(
 
conf.get(spark.ui.filters, ).split(',').map
 
Function( // val $anonfun: notype, tree.tpe=String = String
 
  ValDef( // x$1: String
 
param synthetic triedcooking
 
x$1
 
tpt // tree.tpe=String
 
empty
 
  )
 
  Apply( // def trim(): String in class String, tree.tpe=String
 
x$1.trim // def trim(): String in class String,
  tree.tpe=()String
 
Nil
 
  )
 
)
 
  )
 
)
 
Apply(
 
  filters.foreach
 
  Match(
 
empty
 
CaseDef(
 
  Bind( // val filter: String
 
filter
 
Typed(
 
  _ // tree.tpe=String
 
  String
 
)
 
  )
 
  If(
 
filter.isEmpty.unary_$bang
 
Block(
 
  // 7 statements
 
  Apply(
 
logInfo
 
Apply( // final def +(x$1: Any): String in class String,
  tree.tpe=String
 
  Adding filter: .$plus // final def +(x$1: Any):
 String
  in class String, tree.tpe=(x$1: Any)String
 
  filter // val filter: String, tree.tpe=String
 
)
 
  )
 
  ValDef( // val holder:
 org.eclipse.jetty.servlet.FilterHolder
 
triedcooking
 
holder
 
FilterHolder
 
Apply(
 
  new FilterHolder.init
 
  Nil
 
)
 
  )
 
  Apply( // def setClassName(x$1: String): Unit in class
 Holder,
  tree.tpe=Unit
 
holder.setClassName // def setClassName(x$1: String):
  Unit in class Holder, tree.tpe=(x$1: String)Unit
 
filter // val filter: String, tree.tpe=String
 
  )
 
  Apply(
 
conf.get(spark..+(filter).+(.params),
  ).split(',').map(((x$2: String) = x$2.trim())).toSet.foreach
 
Function( // val $anonfun: notype
 
  ValDef( // param: String
 
param triedcooking
 
param
 
String
 
empty
 
  )
 
  If(
 
param.isEmpty.unary_$bang
 
Block(
 
  ValDef( // val parts: Array[String]
 
triedcooking
 
parts
 
tpt // tree.tpe=Array[String]
 
Apply( // def split(x$1: String): Array[String] in
  class String, tree.tpe=Array[String]
 
  param.split // def split(x$1: String):
  Array[String] in class String, tree.tpe=(x$1: String)Array[String]
 
  =
 
)
 
  )
 
  If(
 
Apply( // def ==(x: Int): Boolean in class Int,
  tree.tpe=Boolean
 
  parts.length.$eq$eq // def ==(x: Int):
  Boolean in class Int, tree.tpe=(x: Int)Boolean
 
  2
 
)
 
Apply( // def setInitParameter(x$1: String,x$2:
  String): Unit in class Holder
 
  holder.setInitParameter // def

thrift jdbc server probably running queries as hive query

2014-11-10 Thread Sadhan Sood
I was testing out the spark thrift jdbc server by running a simple query in
the beeline client. The spark itself is running on a yarn cluster.

However, when I run a query in beeline - I see no running jobs in the
spark UI(completely empty) and the yarn UI seem to indicate that the
submitted query is being run as a map reduce job. This is probably also
being indicated from the spark logs but I am not completely sure:

2014-11-11 00:19:00,492 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1

2014-11-11 00:19:00,877 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,152 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,425 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
is deprecated. Instead, use mapreduce.client.submit.file.replication

2014-11-11 00:19:04,516 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,607 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
(JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your
application with ToolRunner to remedy this

2014-11-11 00:00:08,806 INFO  input.FileInputFormat
(FileInputFormat.java:listStatus(287)) - Total input paths to process :
14912

2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
(GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library

2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:clinit(76)) -
Successfully loaded  initialized native-lzo library [hadoop-lzo rev
8e266e052e423af592871e2dfe09d54c03f6a0e8]

2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
(CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
allocation with : CompletedNodes: 1, size left: 194541317

2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:submitJobInternal(396)) - number of splits:615

2014-11-11 00:00:10,095 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:printTokens(479)) - Submitting tokens for job:
job_1414084656759_0115

2014-11-11 00:00:10,241 INFO  impl.YarnClientImpl
(YarnClientImpl.java:submitApplication(167)) - Submitted application
application_1414084656759_0115


It seems like the query is being run as a hive query instead of spark
query. The same query works fine when run from spark-sql cli.


Sharing spark context across multiple spark sql cli initializations

2014-10-22 Thread Sadhan Sood
We want to run multiple instances of spark sql cli on our yarn cluster.
Each instance of the cli is to be used by a different user. This would be
non-optimal if each user brings up a different cli given how spark works on
yarn by running executor processes (and hence consuming resources) on
worker nodes for the lifetime of the application. Imagine each user trying
to cache a table in memory when there is limited memory across the
cluster.  The right way seems like to use the same spark context shared
across multiple initializations and running just one spark sql application.
Is my understanding correct about resource usage on yarn for spark-sql? Is
there a way to do the sharing of spark context currently ? Seem like it
needs some kind of thrift interface hooked into the cli driver.

*Apologies if you have already seen this on user group*


Fwd: Sharing spark context across multiple spark sql cli initializations

2014-10-22 Thread Sadhan Sood
We want to run multiple instances of spark sql cli on our yarn cluster.
Each instance of the cli is to be used by a different user. This looks
non-optimal if each user brings up a different cli given how spark works on
yarn by running executor processes (and hence consuming resources) on
worker nodes for the lifetime of the application. So, the right way seems
like to use the same spark context shared across multiple initializations
and running just one spark sql application. Is the understanding correct ?
Is there a way to do it currently ? Seem like it needs some kind of thrift
interface hooked into the cli driver.