Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-28 Thread Andrew Otto
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")


It mostly works, but have been having issues with tables that contains a large 
amount of data:

https://issues.apache.org/jira/browse/SPARK-6910 



> On May 27, 2015, at 20:52, Sanjay Subramanian 
>  wrote:
> 
> hey guys
> 
> On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
> there are about 300+ hive tables.
> The data is stored an text (moving slowly to Parquet) on HDFS.
> I want to use SparkSQL and point to the Hive metadata and be able to define 
> JOINS etc using a programming structure like this 
> 
> import org.apache.spark.sql.hive.HiveContext
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")
> 
> 
> Is that the way to go ? Some guidance will be great.
> 
> thanks
> 
> sanjay
> 
> 
> 



Re: HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
What is also strange is that this seems to work on external JSON data, but not 
Parquet.  I’ll try to do more verification of that next week.


> On May 22, 2015, at 16:24, yana  wrote:
> 
> There is an open Jira on Spark not pushing predicates to metastore. I have a 
> large dataset with many partitions but doing anything with it 8s very 
> slow...But I am surprised Spark 1.2 worked for you: it has this problem...
> 
>  Original message ----
> From: Andrew Otto
> Date:05/22/2015 3:51 PM (GMT-05:00)
> To: user@spark.apache.org
> Cc: Joseph Allemandou ,Madhumitha Viswanathan
> Subject: HiveContext fails when querying large external Parquet tables
> 
> Hi all,
> 
> (This email was easier to write in markdown, so I’ve created a gist with its 
> contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
> <https://gist.github.com/ottomata/f91ea76cece97444e269>.  I’ll paste the 
> markdown content in the email body here too.)
> 
> ---
> We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
> 1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. 
>  Since upgrading, we can no longer query our large webrequest dataset using 
> HiveContext.  HiveContext + Parquet and other file types work fine with 
> external tables (We have a similarly large JSON external table that works 
> just fine with HiveContext.)
> 
> Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
> mainly interact with this dataset via a Hive external table, but also have 
> been using Spark's HiveContext.
> 
> ```
> # This single hourly directory is only 5.3M
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 5.3 M  15.8 M  
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 
> # This monthly directory is 1.8T.  (There are subdirectories down to hourly 
> level here too.)
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> 1.8 T  5.3 T  
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> ```
> 
> If I create a Hive table on top of this data, and add the single hourly 
> partition, querying works via both Hive and Spark HiveContext
> 
> ```sql
> hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
> `otto.webrequest_few_partitions_big_data`(
> `hostname`  string  COMMENT 'Source node hostname',
> `sequence`  bigint  COMMENT 'Per host sequence number',
> `dt`string  COMMENT 'Timestame at cache in ISO 8601',
> `time_firstbyte`double  COMMENT 'Time to first byte',
> `ip`string  COMMENT 'IP of packet at cache',
> `cache_status`  string  COMMENT 'Cache status',
> `http_status`   string  COMMENT 'HTTP status of response',
> `response_size` bigint  COMMENT 'Response size',
> `http_method`   string  COMMENT 'HTTP method of request',
> `uri_host`  string  COMMENT 'Host of request',
> `uri_path`  string  COMMENT 'Path of request',
> `uri_query` string  COMMENT 'Query of request',
> `content_type`  string  COMMENT 'Content-Type header of response',
> `referer`   string  COMMENT 'Referer header of request',
> `x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
> `user_agent`string  COMMENT 'User-Agent header of request',
> `accept_language`   string  COMMENT 'Accept-Language header of request',
> `x_analytics`   string  COMMENT 'X-Analytics header of response',
> `range` string  COMMENT 'Range header of response',
> `is_pageview`   boolean COMMENT 'Indicates if this record was marked 
> as a pageview during refinement',
> `record_version`string  COMMENT 'Keeps track of changes in the table 
> content definition - 
> https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest' 
> <https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest'>,
> `client_ip` string  COMMENT 'Client IP computed during refinement 
> using ip and x_forwarded_for',
> `geocoded_data` map  COMMENT 'Geocoded map with 
> continent, country_code, country, city, subdivision, postal_code, latitude, 
> longitude, timezone keys  and associated values.',
> `x_cache`   string  COMMENT 'X-Cache header of response',
> `user_agent_map`map  COMMENT 'User-agent map w

HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
ction.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

I've tested this both in local mode and in YARN client mode, and both have 
similar behavoirs.  What's worrysome is that the behavior is different after 
adding more data to the table, even though I am querying the same very small 
partition.  The whole point of Hive partitions is to allow jobs to work with 
only the data that is needed.  I'm not sure what Spark  HiveContext is doing 
here, but it seems to couple the full size of a Hive table to the performance 
of a query that only needs a very small amount of data.

I poked around the Spark source, and for a minute thought this might be 
related: https://github.com/apache/spark/commit/42389b17, but that was included 
in Spark 1.2.0, and this was working for us fine.

Is HiveContext somehow trying to scan the whole table in the driver?  Has 
anyone else had this problem?

Thanks!

-Andrew Otto