Re: How to do broadcast join in SparkSQL

2014-11-25 Thread Jianshi Huang
Hi,

Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support.
I got the following exceptions:

org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
implement HiveOutputFormat, otherwise it should be either
IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
at
org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

Using the same DDL and Analyze script above.

Jianshi


On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang 
wrote:

> It works fine, thanks for the help Michael.
>
> Liancheng also told me a trick, using a subquery with LIMIT n. It works in
> latest 1.2.0
>
> BTW, looks like the broadcast optimization won't be recognized if I do a
> left join instead of a inner join. Is that true? How can I make it work for
> left joins?
>
> Cheers,
> Jianshi
>
> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust 
> wrote:
>
>> Thanks for the input.  We purposefully made sure that the config option
>> did not make it into a release as it is not something that we are willing
>> to support long term.  That said we'll try and make this easier in the
>> future either through hints or better support for statistics.
>>
>> In this particular case you can get what you want by registering the
>> tables as external tables and setting an flag.  Here's a helper function to
>> do what you need.
>>
>> /**
>>  * Sugar for creating a Hive external table from a parquet path.
>>  */
>> def createParquetTable(name: String, file: String): Unit = {
>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>
>>   val rdd = parquetFile(file)
>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>   val ddl = s"""
>> |CREATE EXTERNAL TABLE $name (
>> |  $schema
>> |)
>> |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>> |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>> |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>> |LOCATION '$file'""".stripMargin
>>   sql(ddl)
>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>> }
>>
>> You'll also need to run this to populate the statistics:
>>
>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>
>>
>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang 
>> wrote:
>>
>>> Ok, currently there's cost-based optimization however Parquet statistics
>>> is not implemented...
>>>
>>> What's the good way if I want to join a big fact table with several tiny
>>> dimension tables in Spark SQL (1.1)?
>>>
>>> I wish we can allow user hint for the join.
>>>
>>> Jianshi
>>>
>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang 
>>> wrote:
>>>
>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>> merged into master?
>>>>
>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>> it's in the following patch.
>>>>
>>>>
>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>
>>>>
>>>> Jianshi
>>>>
>>>>
>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang >>> > wrote:
>>>>
>>>>> Yes, looks like it can only be controlled by the
>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit 
>>>>> weird
>>>>> to me.
>>>>>
>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>> the join algorithm is preferred I think.
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu  wrote:
>>>>>
>>>>>> Have you looked at SPARK-1800 ?
>>>>>>
>>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>> Cheers
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>> jianshi.hu...@gmail.com> wrote:
>>>>>>
>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>> tables to (left) join...
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> --
>>>>>>> Jianshi Huang
>>>>>>>
>>>>>>> LinkedIn: jianshi
>>>>>>> Twitter: @jshuang
>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: How to do broadcast join in SparkSQL

2014-11-25 Thread Jianshi Huang
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang 
wrote:

> Hi,
>
> Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
> support. I got the following exceptions:
>
> org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
> implement HiveOutputFormat, otherwise it should be either
> IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
> at
> org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
> at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
> at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
> at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
>
> Using the same DDL and Analyze script above.
>
> Jianshi
>
>
> On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang 
> wrote:
>
>> It works fine, thanks for the help Michael.
>>
>> Liancheng also told me a trick, using a subquery with LIMIT n. It works
>> in latest 1.2.0
>>
>> BTW, looks like the broadcast optimization won't be recognized if I do a
>> left join instead of a inner join. Is that true? How can I make it work for
>> left joins?
>>
>> Cheers,
>> Jianshi
>>
>> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust 
>> wrote:
>>
>>> Thanks for the input.  We purposefully made sure that the config option
>>> did not make it into a release as it is not something that we are willing
>>> to support long term.  That said we'll try and make this easier in the
>>> future either through hints or better support for statistics.
>>>
>>> In this particular case you can get what you want by registering the
>>> tables as external tables and setting an flag.  Here's a helper function to
>>> do what you need.
>>>
>>> /**
>>>  * Sugar for creating a Hive external table from a parquet path.
>>>  */
>>> def createParquetTable(name: String, file: String): Unit = {
>>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>>
>>>   val rdd = parquetFile(file)
>>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>   val ddl = s"""
>>> |CREATE EXTERNAL TABLE $name (
>>> |  $schema
>>> |)
>>> |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>> |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>> |LOCATION '$file'""".stripMargin
>>>   sql(ddl)
>>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>>> }
>>>
>>> You'll also need to run this to populate the statistics:
>>>
>>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>>
>>>
>>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> Ok, currently there's cost-based optimization however Parquet
>>>> statistics is not implemented...
>>>>
>>>> What's the good way if I want to join a big fact table with several
>>>> tiny dimension tables in Spark SQL (1.1)?
>>>>
>>>> I wish we can allow user hint for the join.
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang 
>>>> wrote:
>>>>
>>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>>> merged into master?
>>>>>
>>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>>> it's in the following patch.
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
&g

Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
I got the following error during Spark startup (Yarn-client mode):

14/12/04 19:33:58 INFO Client: Uploading resource
file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
->
hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
java.lang.IllegalArgumentException: Wrong FS:
hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
at
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
at
org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
at
org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
at
org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
at
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
at
org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
at org.apache.spark.SparkContext.(SparkContext.scala:335)
at
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
at $iwC$$iwC.(:9)
at $iwC.(:18)
at (:20)
at .(:24)

I'm using latest Spark built from master HEAD yesterday. Is this a bug?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf

I've already set the following env variables:

export YARN_CONF_DIR=/etc/hadoop/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HBASE_CONF_DIR=/etc/hbase/conf

Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?

Jianshi

On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang 
wrote:

> I got the following error during Spark startup (Yarn-client mode):
>
> 14/12/04 19:33:58 INFO Client: Uploading resource
> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
> ->
> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
> java.lang.IllegalArgumentException: Wrong FS:
> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
> expected: file:///
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
> at
> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
> at
> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
> at
> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
> at
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
> at
> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
> at
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
> at
> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
> at org.apache.spark.SparkContext.(SparkContext.scala:335)
> at
> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
> at $iwC$$iwC.(:9)
>     at $iwC.(:18)
> at (:20)
> at .(:24)
>
> I'm using latest Spark built from master HEAD yesterday. Is this a bug?
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Actually my HADOOP_CLASSPATH has already been set to include
/etc/hadoop/conf/*

export
HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
classpath)

Jianshi

On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang 
wrote:

> Looks like somehow Spark failed to find the core-site.xml in
> /et/hadoop/conf
>
> I've already set the following env variables:
>
> export YARN_CONF_DIR=/etc/hadoop/conf
> export HADOOP_CONF_DIR=/etc/hadoop/conf
> export HBASE_CONF_DIR=/etc/hbase/conf
>
> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?
>
> Jianshi
>
> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang 
> wrote:
>
>> I got the following error during Spark startup (Yarn-client mode):
>>
>> 14/12/04 19:33:58 INFO Client: Uploading resource
>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
>> ->
>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
>> java.lang.IllegalArgumentException: Wrong FS:
>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
>> expected: file:///
>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
>> at
>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
>> at
>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
>> at
>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
>> at
>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
>> at
>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
>> at
>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
>> at
>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
>> at
>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
>> at
>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
>> at
>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
>> at
>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
>> at
>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
>> at org.apache.spark.SparkContext.(SparkContext.scala:335)
>> at
>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
>> at $iwC$$iwC.(:9)
>> at $iwC.(:18)
>> at (:20)
>>     at .(:24)
>>
>> I'm using latest Spark built from master HEAD yesterday. Is this a bug?
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Looks like the datanucleus*.jar shouldn't appear in the hdfs path in
Yarn-client mode.

Maybe this patch broke yarn-client.

https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53

Jianshi

On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang 
wrote:

> Actually my HADOOP_CLASSPATH has already been set to include
> /etc/hadoop/conf/*
>
> export
> HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
> classpath)
>
> Jianshi
>
> On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang 
> wrote:
>
>> Looks like somehow Spark failed to find the core-site.xml in
>> /et/hadoop/conf
>>
>> I've already set the following env variables:
>>
>> export YARN_CONF_DIR=/etc/hadoop/conf
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>> export HBASE_CONF_DIR=/etc/hbase/conf
>>
>> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?
>>
>> Jianshi
>>
>> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang 
>> wrote:
>>
>>> I got the following error during Spark startup (Yarn-client mode):
>>>
>>> 14/12/04 19:33:58 INFO Client: Uploading resource
>>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
>>> ->
>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
>>> java.lang.IllegalArgumentException: Wrong FS:
>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
>>> expected: file:///
>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
>>> at
>>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
>>> at
>>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
>>> at
>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
>>> at
>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
>>> at
>>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
>>> at
>>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
>>> at
>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
>>> at
>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
>>> at
>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
>>> at
>>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
>>> at
>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
>>> at
>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>>> at
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
>>> at org.apache.spark.SparkContext.(SparkContext.scala:335)
>>>     at
>>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
>>>     at $iwC$$iwC.(:9)
>>> at $iwC.(:18)
>>> at (:20)
>>> at .(:24)
>>>
>>> I'm using latest Spark built from master HEAD yesterday. Is this a bug?
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Correction:

According to Liancheng, this hotfix might be the root cause:


https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce

Jianshi

On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang 
wrote:

> Looks like the datanucleus*.jar shouldn't appear in the hdfs path in
> Yarn-client mode.
>
> Maybe this patch broke yarn-client.
>
>
> https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53
>
> Jianshi
>
> On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang 
> wrote:
>
>> Actually my HADOOP_CLASSPATH has already been set to include
>> /etc/hadoop/conf/*
>>
>> export
>> HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
>> classpath)
>>
>> Jianshi
>>
>> On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang 
>> wrote:
>>
>>> Looks like somehow Spark failed to find the core-site.xml in
>>> /et/hadoop/conf
>>>
>>> I've already set the following env variables:
>>>
>>> export YARN_CONF_DIR=/etc/hadoop/conf
>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> export HBASE_CONF_DIR=/etc/hbase/conf
>>>
>>> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?
>>>
>>> Jianshi
>>>
>>> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> I got the following error during Spark startup (Yarn-client mode):
>>>>
>>>> 14/12/04 19:33:58 INFO Client: Uploading resource
>>>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
>>>> ->
>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
>>>> java.lang.IllegalArgumentException: Wrong FS:
>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
>>>> expected: file:///
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
>>>> at
>>>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
>>>> at
>>>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
>>>> at
>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
>>>> at
>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
>>>> at
>>>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
>>>> at
>>>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
>>>> at
>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
>>>> at
>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
>>>> at scala.Option.foreach(Option.scala:236)
>>>> at
>>>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
>>>> at
>>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
>>>> at
>>>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
>>>> at
>>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
>>>> at
>>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
>>>>     at
>>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>>>> at
>>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
>>>> at org.apache.spark.SparkContext.(SparkContext.scala:335)
>>>> at
>>>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
>>>> at $iwC$$iwC.(:9)
>>>> at $iwC.(:18)
>>>> at (:20)
>>>> at .(:24)
>>>>
>>>> I'm using latest Spark built from master HEAD yesterday. Is this a bug?
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
I created a ticket for this:

  https://issues.apache.org/jira/browse/SPARK-4757


Jianshi

On Fri, Dec 5, 2014 at 1:31 PM, Jianshi Huang 
wrote:

> Correction:
>
> According to Liancheng, this hotfix might be the root cause:
>
>
> https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce
>
> Jianshi
>
> On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang 
> wrote:
>
>> Looks like the datanucleus*.jar shouldn't appear in the hdfs path in
>> Yarn-client mode.
>>
>> Maybe this patch broke yarn-client.
>>
>>
>> https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53
>>
>> Jianshi
>>
>> On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang 
>> wrote:
>>
>>> Actually my HADOOP_CLASSPATH has already been set to include
>>> /etc/hadoop/conf/*
>>>
>>> export
>>> HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
>>> classpath)
>>>
>>> Jianshi
>>>
>>> On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> Looks like somehow Spark failed to find the core-site.xml in
>>>> /et/hadoop/conf
>>>>
>>>> I've already set the following env variables:
>>>>
>>>> export YARN_CONF_DIR=/etc/hadoop/conf
>>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>> export HBASE_CONF_DIR=/etc/hbase/conf
>>>>
>>>> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?
>>>>
>>>> Jianshi
>>>>
>>>> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang >>> > wrote:
>>>>
>>>>> I got the following error during Spark startup (Yarn-client mode):
>>>>>
>>>>> 14/12/04 19:33:58 INFO Client: Uploading resource
>>>>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
>>>>> ->
>>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
>>>>> java.lang.IllegalArgumentException: Wrong FS:
>>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
>>>>> expected: file:///
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
>>>>> at
>>>>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
>>>>> at
>>>>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
>>>>> at
>>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
>>>>> at
>>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
>>>>> at
>>>>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
>>>>> at scala.Option.foreach(Option.scala:236)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
>>>>> at
>>>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
>>>>> at
>>>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
>>>>> at org.apache.spark.SparkContext.(SparkContext.scala:335)
>>>>> at
>>>>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
>>>>> at $iwC$$iwC.(:9)
>>>>> at $iwC.(:18)
>>>>> at (:20)
>>>>> at .(:24)
>>>>>
>>>>> I'm using latest Spark built from master HEAD yesterday. Is this a bug?
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
Sorry for the late of follow-up.

I used Hao's DESC EXTENDED command and found some clue:

new (broadcast broken Spark build):
parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}

old (broadcast working Spark build):
parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
totalSize=56166}

Looks like the table size computation failed in the latest version.

I've run the analyze command:

  ANALYZE TABLE $table COMPUTE STATISTICS noscan

And the tables are created from Parquet files:

e.g.
CREATE EXTERNAL TABLE table1 (
  code int,
  desc string
)
STORED AS PARQUET
LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'


Anyone knows what went wrong?


Thanks,
Jianshi



On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao  wrote:

>  Hi Jianshi,
>
> I couldn’t reproduce that with latest MASTER, and I can always get the
> BroadcastHashJoin for managed tables (in .csv file) in my testing, are
> there any external tables in your case?
>
>
>
> In general probably couple of things you can try first (with HiveContext):
>
> 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
> of the tables);
>
> 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold
> as a greater value, it is 1024*1024*10 by default, just make sure the
> maximum dimension tables size (in bytes) is less than this)
>
> 3)  Always put the main table(the biggest table) in the left-most
> among the inner joins;
>
>
>
> DESC EXTENDED tablename; -- this will print the detail information for the
> statistic table size (the field “totalSize”)
>
> EXPLAIN EXTENDED query; -- this will print the detail physical plan.
>
>
>
> Let me know if you still have problem.
>
>
>
> Hao
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, November 27, 2014 10:24 PM
> *To:* Cheng, Hao
> *Cc:* user
> *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark
>
>
>
> Hi Hao,
>
>
>
> I'm using inner join as Broadcast join didn't work for left joins (thanks
> for the links for the latest improvements).
>
>
>
> And I'm using HiveConext and it worked in a previous build (10/12) when
> joining 15 dimension tables.
>
>
>
> Jianshi
>
>
>
> On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao  wrote:
>
>  Are all of your join keys the same? and I guess the join type are all
> “Left” join, https://github.com/apache/spark/pull/3362 probably is what
> you need.
>
>
>
> And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
> join) currently,  https://github.com/apache/spark/pull/3270 should be
> another optimization for this.
>
>
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Wednesday, November 26, 2014 4:36 PM
> *To:* user
> *Subject:* Auto BroadcastJoin optimization failed in latest Spark
>
>
>
> Hi,
>
>
>
> I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails
> optimizing auto broadcast join in my query. I have a query that joins a
> huge fact table with 15 tiny dimension tables.
>
>
>
> I'm currently using an older version of Spark which was built on Oct. 12.
>
>
>
> Anyone else has met similar situation?
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


drop table if exists throws exception

2014-12-04 Thread Jianshi Huang
Hi,

I got exception saying Hive: NoSuchObjectException(message: table
not found)

when running "DROP TABLE IF EXISTS "

Looks like a new regression in Hive module.

Anyone can confirm this?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
If I run ANALYZE without NOSCAN, then Hive can successfully get the size:

parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589,
COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296}

Is Hive's PARQUET support broken?

Jianshi


On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang 
wrote:

> Sorry for the late of follow-up.
>
> I used Hao's DESC EXTENDED command and found some clue:
>
> new (broadcast broken Spark build):
> parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
> COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}
>
> old (broadcast working Spark build):
> parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
> totalSize=56166}
>
> Looks like the table size computation failed in the latest version.
>
> I've run the analyze command:
>
>   ANALYZE TABLE $table COMPUTE STATISTICS noscan
>
> And the tables are created from Parquet files:
>
> e.g.
> CREATE EXTERNAL TABLE table1 (
>   code int,
>   desc string
> )
> STORED AS PARQUET
> LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'
>
>
> Anyone knows what went wrong?
>
>
> Thanks,
> Jianshi
>
>
>
> On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao  wrote:
>
>>  Hi Jianshi,
>>
>> I couldn’t reproduce that with latest MASTER, and I can always get the
>> BroadcastHashJoin for managed tables (in .csv file) in my testing, are
>> there any external tables in your case?
>>
>>
>>
>> In general probably couple of things you can try first (with HiveContext):
>>
>> 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
>> of the tables);
>>
>> 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold
>> as a greater value, it is 1024*1024*10 by default, just make sure the
>> maximum dimension tables size (in bytes) is less than this)
>>
>> 3)  Always put the main table(the biggest table) in the left-most
>> among the inner joins;
>>
>>
>>
>> DESC EXTENDED tablename; -- this will print the detail information for
>> the statistic table size (the field “totalSize”)
>>
>> EXPLAIN EXTENDED query; -- this will print the detail physical plan.
>>
>>
>>
>> Let me know if you still have problem.
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Thursday, November 27, 2014 10:24 PM
>> *To:* Cheng, Hao
>> *Cc:* user
>> *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark
>>
>>
>>
>> Hi Hao,
>>
>>
>>
>> I'm using inner join as Broadcast join didn't work for left joins (thanks
>> for the links for the latest improvements).
>>
>>
>>
>> And I'm using HiveConext and it worked in a previous build (10/12) when
>> joining 15 dimension tables.
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao  wrote:
>>
>>  Are all of your join keys the same? and I guess the join type are all
>> “Left” join, https://github.com/apache/spark/pull/3362 probably is what
>> you need.
>>
>>
>>
>> And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
>> join) currently,  https://github.com/apache/spark/pull/3270 should be
>> another optimization for this.
>>
>>
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Wednesday, November 26, 2014 4:36 PM
>> *To:* user
>> *Subject:* Auto BroadcastJoin optimization failed in latest Spark
>>
>>
>>
>> Hi,
>>
>>
>>
>> I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1
>> fails optimizing auto broadcast join in my query. I have a query that joins
>> a huge fact table with 15 tiny dimension tables.
>>
>>
>>
>> I'm currently using an older version of Spark which was built on Oct. 12.
>>
>>
>>
>> Anyone else has met similar situation?
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
With Liancheng's suggestion, I've tried setting

 spark.sql.hive.convertMetastoreParquet  false

but still analyze noscan return -1 in rawDataSize

Jianshi


On Fri, Dec 5, 2014 at 3:33 PM, Jianshi Huang 
wrote:

> If I run ANALYZE without NOSCAN, then Hive can successfully get the size:
>
> parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589,
> COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296}
>
> Is Hive's PARQUET support broken?
>
> Jianshi
>
>
> On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang 
> wrote:
>
>> Sorry for the late of follow-up.
>>
>> I used Hao's DESC EXTENDED command and found some clue:
>>
>> new (broadcast broken Spark build):
>> parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
>> COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}
>>
>> old (broadcast working Spark build):
>> parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
>> totalSize=56166}
>>
>> Looks like the table size computation failed in the latest version.
>>
>> I've run the analyze command:
>>
>>   ANALYZE TABLE $table COMPUTE STATISTICS noscan
>>
>> And the tables are created from Parquet files:
>>
>> e.g.
>> CREATE EXTERNAL TABLE table1 (
>>   code int,
>>   desc string
>> )
>> STORED AS PARQUET
>> LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'
>>
>>
>> Anyone knows what went wrong?
>>
>>
>> Thanks,
>> Jianshi
>>
>>
>>
>> On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao  wrote:
>>
>>>  Hi Jianshi,
>>>
>>> I couldn’t reproduce that with latest MASTER, and I can always get the
>>> BroadcastHashJoin for managed tables (in .csv file) in my testing, are
>>> there any external tables in your case?
>>>
>>>
>>>
>>> In general probably couple of things you can try first (with
>>> HiveContext):
>>>
>>> 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
>>> of the tables);
>>>
>>> 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the
>>> threshold as a greater value, it is 1024*1024*10 by default, just make sure
>>> the maximum dimension tables size (in bytes) is less than this)
>>>
>>> 3)  Always put the main table(the biggest table) in the left-most
>>> among the inner joins;
>>>
>>>
>>>
>>> DESC EXTENDED tablename; -- this will print the detail information for
>>> the statistic table size (the field “totalSize”)
>>>
>>> EXPLAIN EXTENDED query; -- this will print the detail physical plan.
>>>
>>>
>>>
>>> Let me know if you still have problem.
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>>> *Sent:* Thursday, November 27, 2014 10:24 PM
>>> *To:* Cheng, Hao
>>> *Cc:* user
>>> *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark
>>>
>>>
>>>
>>> Hi Hao,
>>>
>>>
>>>
>>> I'm using inner join as Broadcast join didn't work for left joins
>>> (thanks for the links for the latest improvements).
>>>
>>>
>>>
>>> And I'm using HiveConext and it worked in a previous build (10/12) when
>>> joining 15 dimension tables.
>>>
>>>
>>>
>>> Jianshi
>>>
>>>
>>>
>>> On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao  wrote:
>>>
>>>  Are all of your join keys the same? and I guess the join type are all
>>> “Left” join, https://github.com/apache/spark/pull/3362 probably is what
>>> you need.
>>>
>>>
>>>
>>> And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
>>> join) currently,  https://github.com/apache/spark/pull/3270 should be
>>> another optimization for this.
>>>
>>>
>>>
>>>
>>>
>>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>>> *Sent:* Wednesday, November 26, 2014 4:36 PM
>>> *To:* user
>>> *Subject:* Auto BroadcastJoin optimization failed in latest Spark
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1
>>> fails optimizing auto broadcast join in my query. I have a query that joins
>>> a huge fact table with 15 tiny dimension tables.
>>>
>>>
>>>
>>> I'm currently using an older version of Spark which was built on Oct. 12.
>>>
>>>
>>>
>>> Anyone else has met similar situation?
>>>
>>>
>>>
>>> --
>>>
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: drop table if exists throws exception

2014-12-05 Thread Jianshi Huang
I see. The resulting SchemaRDD is returned so like Michael said, the
exception does not propogate to user code.

However printing out the following log is confusing :)

scala> sql("drop table if exists abc")
14/12/05 16:27:02 INFO ParseDriver: Parsing command: drop table if exists
abc
14/12/05 16:27:02 INFO ParseDriver: Parse Completed
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO Driver: Concurrency mode is disabled, not creating a
lock manager
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO ParseDriver: Parsing command: DROP TABLE IF EXISTS
abc
14/12/05 16:27:02 INFO ParseDriver: Parse Completed
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc
14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr
 cmd=get_table : db=default tbl=abc
14/12/05 16:27:02 INFO Driver: Semantic Analysis Completed
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO Driver: Returning Hive schema:
Schema(fieldSchemas:null, properties:null)
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO Driver: Starting command: DROP TABLE IF EXISTS abc
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO PerfLogger: 
14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc
14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr
 cmd=get_table : db=default tbl=abc
14/12/05 16:27:02 ERROR Hive: NoSuchObjectException(message:default.abc
table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)

On Sat, Dec 6, 2014 at 3:45 AM, Mark Hamstra 
wrote:

> And that is no different from how Hive has worked for a long time.
>
> On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust 
> wrote:
>
>> The command run fine for me on master.  Note that Hive does print an
>> exception in the logs, but that exception does not propogate to user code.
>>
>> On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang 
>> wrote:
>>
>> > Hi,
>> >
>> > I got exception saying Hive: NoSuchObjectException(message: table
>> > not found)
>> >
>> > when running "DROP TABLE IF EXISTS "
>> >
>> > Looks like a new regression in Hive module.
>> >
>> > Anyone can confirm this?
>> >
>> > Thanks,
>> > --
>> > Jianshi Huang
>> >
>> > LinkedIn: jianshi
>> > Twitter: @jshuang
>> > Github & Blog: http://huangjs.github.com/
>> >
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Ok, found another possible bug in Hive.

My current solution is to use ALTER TABLE CHANGE to rename the column names.

The problem is after renaming the column names, the value of the columns
became all NULL.

Before renaming:
scala> sql("select `sorted::cre_ts` from pmt limit 1").collect
res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])

Execute renaming:
scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string")
res13: org.apache.spark.sql.SchemaRDD =
SchemaRDD[972] at RDD at SchemaRDD.scala:108
== Query Plan ==


After renaming:
scala> sql("select cre_ts from pmt limit 1").collect
res16: Array[org.apache.spark.sql.Row] = Array([null])

I created a JIRA for it:

  https://issues.apache.org/jira/browse/SPARK-4781


Jianshi

On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang 
wrote:

> Hmm... another issue I found doing this approach is that ANALYZE TABLE ...
> COMPUTE STATISTICS will fail to attach the metadata to the table, and later
> broadcast join and such will fail...
>
> Any idea how to fix this issue?
>
> Jianshi
>
> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang 
> wrote:
>
>> Very interesting, the line doing drop table will throws an exception.
>> After removing it all works.
>>
>> Jianshi
>>
>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
>> wrote:
>>
>>> Here's the solution I got after talking with Liancheng:
>>>
>>> 1) using backquote `..` to wrap up all illegal characters
>>>
>>> val rdd = parquetFile(file)
>>> val schema = rdd.schema.fields.map(f => s"`${f.name}`
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>
>>> val ddl_13 = s"""
>>>   |CREATE EXTERNAL TABLE $name (
>>>   |  $schema
>>>   |)
>>>   |STORED AS PARQUET
>>>   |LOCATION '$file'
>>>   """.stripMargin
>>>
>>> sql(ddl_13)
>>>
>>> 2) create a new Schema and do applySchema to generate a new SchemaRDD,
>>> had to drop and register table
>>>
>>> val t = table(name)
>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name =
>>> s.name.replaceAll(".*?::", ""
>>> sql(s"drop table $name")
>>> applySchema(t, newSchema).registerTempTable(name)
>>>
>>> I'm testing it for now.
>>>
>>> Thanks for the help!
>>>
>>>
>>> Jianshi
>>>
>>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I had to use Pig for some preprocessing and to generate Parquet files
>>>> for Spark to consume.
>>>>
>>>> However, due to Pig's limitation, the generated schema contains Pig's
>>>> identifier
>>>>
>>>> e.g.
>>>> sorted::id, sorted::cre_ts, ...
>>>>
>>>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.
>>>>
>>>>   create external table pmt (
>>>> sorted::id bigint
>>>>   )
>>>>   stored as parquet
>>>>   location '...'
>>>>
>>>> Obviously it didn't work, I also tried removing the identifier
>>>> sorted::, but the resulting rows contain only nulls.
>>>>
>>>> Any idea how to create a table in HiveContext from these Parquet files?
>>>>
>>>> Thanks,
>>>> Jianshi
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-08 Thread Jianshi Huang
Ah... I see. Thanks for pointing it out.

Then it means we cannot mount external table using customized column names.
hmm...

Then the only option left is to use a subquery to add a bunch of column
alias. I'll try it later.

Thanks,
Jianshi

On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust 
wrote:

> This is by hive's design.  From the Hive documentation:
>
> The column change command will only modify Hive's metadata, and will not
>> modify data. Users should make sure the actual data layout of the
>> table/partition conforms with the metadata definition.
>
>
>
> On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang 
> wrote:
>
>> Ok, found another possible bug in Hive.
>>
>> My current solution is to use ALTER TABLE CHANGE to rename the column
>> names.
>>
>> The problem is after renaming the column names, the value of the columns
>> became all NULL.
>>
>> Before renaming:
>> scala> sql("select `sorted::cre_ts` from pmt limit 1").collect
>> res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])
>>
>> Execute renaming:
>> scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string")
>> res13: org.apache.spark.sql.SchemaRDD =
>> SchemaRDD[972] at RDD at SchemaRDD.scala:108
>> == Query Plan ==
>> 
>>
>> After renaming:
>> scala> sql("select cre_ts from pmt limit 1").collect
>> res16: Array[org.apache.spark.sql.Row] = Array([null])
>>
>> I created a JIRA for it:
>>
>>   https://issues.apache.org/jira/browse/SPARK-4781
>>
>>
>> Jianshi
>>
>> On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang 
>> wrote:
>>
>>> Hmm... another issue I found doing this approach is that ANALYZE TABLE
>>> ... COMPUTE STATISTICS will fail to attach the metadata to the table, and
>>> later broadcast join and such will fail...
>>>
>>> Any idea how to fix this issue?
>>>
>>> Jianshi
>>>
>>> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang 
>>> wrote:
>>>
>>>> Very interesting, the line doing drop table will throws an exception.
>>>> After removing it all works.
>>>>
>>>> Jianshi
>>>>
>>>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
>>>> wrote:
>>>>
>>>>> Here's the solution I got after talking with Liancheng:
>>>>>
>>>>> 1) using backquote `..` to wrap up all illegal characters
>>>>>
>>>>> val rdd = parquetFile(file)
>>>>> val schema = rdd.schema.fields.map(f => s"`${f.name}`
>>>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>>>
>>>>> val ddl_13 = s"""
>>>>>   |CREATE EXTERNAL TABLE $name (
>>>>>   |  $schema
>>>>>   |)
>>>>>   |STORED AS PARQUET
>>>>>   |LOCATION '$file'
>>>>>   """.stripMargin
>>>>>
>>>>> sql(ddl_13)
>>>>>
>>>>> 2) create a new Schema and do applySchema to generate a new SchemaRDD,
>>>>> had to drop and register table
>>>>>
>>>>> val t = table(name)
>>>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name =
>>>>> s.name.replaceAll(".*?::", ""
>>>>> sql(s"drop table $name")
>>>>> applySchema(t, newSchema).registerTempTable(name)
>>>>>
>>>>> I'm testing it for now.
>>>>>
>>>>> Thanks for the help!
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang >>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I had to use Pig for some preprocessing and to generate Parquet files
>>>>>> for Spark to consume.
>>>>>>
>>>>>> However, due to Pig's limitation, the generated schema contains Pig's
>>>>>> identifier
>>>>>>
>>>>>> e.g.
>>>>>> sorted::id, sorted::cre_ts, ...
>>>>>>
>>>>>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.
>>>>>>
>>>>>>   create external table pmt (
>>>>>> sorted::id bigint
>>>>>>   )
>>>>>>   stored as parquet
>>>>>>   location '...'
>>>>>>
>>>>>> Obviously it didn't work, I also tried removing the identifier
>>>>>> sorted::, but the resulting rows contain only nulls.
>>>>>>
>>>>>> Any idea how to create a table in HiveContext from these Parquet
>>>>>> files?
>>>>>>
>>>>>> Thanks,
>>>>>> Jianshi
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-23 Thread Jianshi Huang
FYI,

Latest hive 0.14/parquet will have column renaming support.

Jianshi

On Wed, Dec 10, 2014 at 3:37 AM, Michael Armbrust 
wrote:

> You might also try out the recently added support for views.
>
> On Mon, Dec 8, 2014 at 9:31 PM, Jianshi Huang 
> wrote:
>
>> Ah... I see. Thanks for pointing it out.
>>
>> Then it means we cannot mount external table using customized column
>> names. hmm...
>>
>> Then the only option left is to use a subquery to add a bunch of column
>> alias. I'll try it later.
>>
>> Thanks,
>> Jianshi
>>
>> On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust 
>> wrote:
>>
>>> This is by hive's design.  From the Hive documentation:
>>>
>>> The column change command will only modify Hive's metadata, and will not
>>>> modify data. Users should make sure the actual data layout of the
>>>> table/partition conforms with the metadata definition.
>>>
>>>
>>>
>>> On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang 
>>> wrote:
>>>
>>>> Ok, found another possible bug in Hive.
>>>>
>>>> My current solution is to use ALTER TABLE CHANGE to rename the column
>>>> names.
>>>>
>>>> The problem is after renaming the column names, the value of the
>>>> columns became all NULL.
>>>>
>>>> Before renaming:
>>>> scala> sql("select `sorted::cre_ts` from pmt limit 1").collect
>>>> res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])
>>>>
>>>> Execute renaming:
>>>> scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string")
>>>> res13: org.apache.spark.sql.SchemaRDD =
>>>> SchemaRDD[972] at RDD at SchemaRDD.scala:108
>>>> == Query Plan ==
>>>> 
>>>>
>>>> After renaming:
>>>> scala> sql("select cre_ts from pmt limit 1").collect
>>>> res16: Array[org.apache.spark.sql.Row] = Array([null])
>>>>
>>>> I created a JIRA for it:
>>>>
>>>>   https://issues.apache.org/jira/browse/SPARK-4781
>>>>
>>>>
>>>> Jianshi
>>>>
>>>> On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang 
>>>> wrote:
>>>>
>>>>> Hmm... another issue I found doing this approach is that ANALYZE TABLE
>>>>> ... COMPUTE STATISTICS will fail to attach the metadata to the table, and
>>>>> later broadcast join and such will fail...
>>>>>
>>>>> Any idea how to fix this issue?
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang >>>> > wrote:
>>>>>
>>>>>> Very interesting, the line doing drop table will throws an exception.
>>>>>> After removing it all works.
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang <
>>>>>> jianshi.hu...@gmail.com> wrote:
>>>>>>
>>>>>>> Here's the solution I got after talking with Liancheng:
>>>>>>>
>>>>>>> 1) using backquote `..` to wrap up all illegal characters
>>>>>>>
>>>>>>> val rdd = parquetFile(file)
>>>>>>> val schema = rdd.schema.fields.map(f => s"`${f.name}`
>>>>>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>>>>>
>>>>>>> val ddl_13 = s"""
>>>>>>>   |CREATE EXTERNAL TABLE $name (
>>>>>>>   |  $schema
>>>>>>>   |)
>>>>>>>   |STORED AS PARQUET
>>>>>>>   |LOCATION '$file'
>>>>>>>   """.stripMargin
>>>>>>>
>>>>>>> sql(ddl_13)
>>>>>>>
>>>>>>> 2) create a new Schema and do applySchema to generate a new
>>>>>>> SchemaRDD, had to drop and register table
>>>>>>>
>>>>>>> val t = table(name)
>>>>>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name
>>>>>>> = s.name.replaceAll(".*?::", ""
>>>>>>> sql(s&

Re: How to do broadcast join in SparkSQL

2014-10-07 Thread Jianshi Huang
Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
into master?

I cannot find spark.sql.hints.broadcastTables in latest master, but it's in
the following patch.


https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5


Jianshi


On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang 
wrote:

> Yes, looks like it can only be controlled by the
> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
> to me.
>
> How am I suppose to know the exact bytes of a table? Let me specify the
> join algorithm is preferred I think.
>
> Jianshi
>
> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu  wrote:
>
>> Have you looked at SPARK-1800 ?
>>
>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>> Cheers
>>
>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang 
>> wrote:
>>
>>> I cannot find it in the documentation. And I have a dozen dimension
>>> tables to (left) join...
>>>
>>>
>>> Cheers,
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: How to do broadcast join in SparkSQL

2014-10-08 Thread Jianshi Huang
Ok, currently there's cost-based optimization however Parquet statistics is
not implemented...

What's the good way if I want to join a big fact table with several tiny
dimension tables in Spark SQL (1.1)?

I wish we can allow user hint for the join.

Jianshi

On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang 
wrote:

> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
> into master?
>
> I cannot find spark.sql.hints.broadcastTables in latest master, but it's
> in the following patch.
>
>
> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>
>
> Jianshi
>
>
> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang 
> wrote:
>
>> Yes, looks like it can only be controlled by the
>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>> to me.
>>
>> How am I suppose to know the exact bytes of a table? Let me specify the
>> join algorithm is preferred I think.
>>
>> Jianshi
>>
>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu  wrote:
>>
>>> Have you looked at SPARK-1800 ?
>>>
>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>> Cheers
>>>
>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>> tables to (left) join...
>>>>
>>>>
>>>> Cheers,
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: How to do broadcast join in SparkSQL

2014-10-10 Thread Jianshi Huang
It works fine, thanks for the help Michael.

Liancheng also told me a trick, using a subquery with LIMIT n. It works in
latest 1.2.0

BTW, looks like the broadcast optimization won't be recognized if I do a
left join instead of a inner join. Is that true? How can I make it work for
left joins?

Cheers,
Jianshi

On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust 
wrote:

> Thanks for the input.  We purposefully made sure that the config option
> did not make it into a release as it is not something that we are willing
> to support long term.  That said we'll try and make this easier in the
> future either through hints or better support for statistics.
>
> In this particular case you can get what you want by registering the
> tables as external tables and setting an flag.  Here's a helper function to
> do what you need.
>
> /**
>  * Sugar for creating a Hive external table from a parquet path.
>  */
> def createParquetTable(name: String, file: String): Unit = {
>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>
>   val rdd = parquetFile(file)
>   val schema = rdd.schema.fields.map(f => s"${f.name}
> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>   val ddl = s"""
> |CREATE EXTERNAL TABLE $name (
> |  $schema
> |)
> |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
> |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
> |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
> |LOCATION '$file'""".stripMargin
>   sql(ddl)
>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
> }
>
> You'll also need to run this to populate the statistics:
>
> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>
>
> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang 
> wrote:
>
>> Ok, currently there's cost-based optimization however Parquet statistics
>> is not implemented...
>>
>> What's the good way if I want to join a big fact table with several tiny
>> dimension tables in Spark SQL (1.1)?
>>
>> I wish we can allow user hint for the join.
>>
>> Jianshi
>>
>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang 
>> wrote:
>>
>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>> merged into master?
>>>
>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>> it's in the following patch.
>>>
>>>
>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>
>>>
>>> Jianshi
>>>
>>>
>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang 
>>> wrote:
>>>
>>>> Yes, looks like it can only be controlled by the
>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>> to me.
>>>>
>>>> How am I suppose to know the exact bytes of a table? Let me specify the
>>>> join algorithm is preferred I think.
>>>>
>>>> Jianshi
>>>>
>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu  wrote:
>>>>
>>>>> Have you looked at SPARK-1800 ?
>>>>>
>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>> Cheers
>>>>>
>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>> jianshi.hu...@gmail.com> wrote:
>>>>>
>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>> tables to (left) join...
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
https://issues.apache.org/jira/browse/SPARK-3106

I'm having the saming errors described in SPARK-3106 (no other types of
errors confirmed), running a bunch sql queries on spark 1.2.0 built from
latest master HEAD.

Any updates to this issue?

My main task is to join a huge fact table with a dozen dim tables (using
HiveContext) and then map it to my class object. It failed a couple of
times and now I cached the intermediate table and currently it seems
working fine... no idea why until I found SPARK-3106

Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
Hmm... it failed again, just lasted a little bit longer.

Jianshi

On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang 
wrote:

> https://issues.apache.org/jira/browse/SPARK-3106
>
> I'm having the saming errors described in SPARK-3106 (no other types of
> errors confirmed), running a bunch sql queries on spark 1.2.0 built from
> latest master HEAD.
>
> Any updates to this issue?
>
> My main task is to join a huge fact table with a dozen dim tables (using
> HiveContext) and then map it to my class object. It failed a couple of
> times and now I cached the intermediate table and currently it seems
> working fine... no idea why until I found SPARK-3106
>
> Cheers,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
Turned out it was caused by this issue:
https://issues.apache.org/jira/browse/SPARK-3923

Set spark.akka.heartbeat.interval to 100 solved it.

Jianshi

On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang 
wrote:

> Hmm... it failed again, just lasted a little bit longer.
>
> Jianshi
>
> On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang 
> wrote:
>
>> https://issues.apache.org/jira/browse/SPARK-3106
>>
>> I'm having the saming errors described in SPARK-3106 (no other types of
>> errors confirmed), running a bunch sql queries on spark 1.2.0 built from
>> latest master HEAD.
>>
>> Any updates to this issue?
>>
>> My main task is to join a huge fact table with a dozen dim tables (using
>> HiveContext) and then map it to my class object. It failed a couple of
>> times and now I cached the intermediate table and currently it seems
>> working fine... no idea why until I found SPARK-3106
>>
>> Cheers,
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
One thing made me very confused during debuggin is the error message. The
important one

  WARN ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkDriver@xxx:50278] has failed, address is now gated for
[5000] ms. Reason is: [Disassociated].

is of Log Level WARN.

Jianshi


On Tue, Oct 14, 2014 at 4:36 AM, Jianshi Huang 
wrote:

> Turned out it was caused by this issue:
> https://issues.apache.org/jira/browse/SPARK-3923
>
> Set spark.akka.heartbeat.interval to 100 solved it.
>
> Jianshi
>
> On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang 
> wrote:
>
>> Hmm... it failed again, just lasted a little bit longer.
>>
>> Jianshi
>>
>> On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang 
>> wrote:
>>
>>> https://issues.apache.org/jira/browse/SPARK-3106
>>>
>>> I'm having the saming errors described in SPARK-3106 (no other types of
>>> errors confirmed), running a bunch sql queries on spark 1.2.0 built from
>>> latest master HEAD.
>>>
>>> Any updates to this issue?
>>>
>>> My main task is to join a huge fact table with a dozen dim tables (using
>>> HiveContext) and then map it to my class object. It failed a couple of
>>> times and now I cached the intermediate table and currently it seems
>>> working fine... no idea why until I found SPARK-3106
>>>
>>> Cheers,
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Multitenancy in Spark - within/across spark context

2014-10-23 Thread Jianshi Huang
Upvote for the multitanency requirement.

I'm also building a data analytic platform and there'll be multiple users
running queries and computations simultaneously. One of the paint point is
control of resource size. Users don't really know how much nodes they need,
they always use as much as possible... The result is lots of wasted
resource in our Yarn cluster.

A way to 1) allow multiple spark context to share the same resource or 2)
add dynamic resource management for Yarn mode is very much wanted.

Jianshi

On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin  wrote:

> On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar
>  wrote:
> >> That's not something you might want to do usually. In general, a
> >> SparkContext maps to a user application
> >
> > My question was basically this. In this page in the official doc, under
> > "Scheduling within an application" section, it talks about multiuser and
> > fair sharing within an app. How does multiuser within an application
> > work(how users connect to an app,run their stuff) ? When would I want to
> use
> > this ?
>
> I see. The way I read that page is that Spark supports all those
> scheduling options; but Spark doesn't give you the means to actually
> be able to submit jobs from different users to a running SparkContext
> hosted on a different process. For that, you'll need something like
> the job server that I referenced before, or write your own framework
> for supporting that.
>
> Personally, I'd use the information on that page when dealing with
> concurrent jobs in the same SparkContext, but still restricted to the
> same user. I'd avoid trying to create any application where a single
> SparkContext is trying to be shared by multiple users in any way.
>
> >> As far as I understand, this will cause executors to be killed, which
> >> means that Spark will start retrying tasks to rebuild the data that
> >> was held by those executors when needed.
> >
> > I basically wanted to find out if there were any "gotchas" related to
> > preemption on Spark. Things like say half of an application's executors
> got
> > preempted say while doing reduceByKey, will the application progress with
> > the remaining resources/fair share ?
>
> Jobs should still make progress as long as at least one executor is
> available. The gotcha would be the one I mentioned, where Spark will
> fail your job after "x" executors failed, which might be a common
> occurrence when preemption is enabled. That being said, it's a
> configurable option, so you can set "x" to a very large value and your
> job should keep on chugging along.
>
> The options you'd want to take a look at are: spark.task.maxFailures
> and spark.yarn.max.executor.failures
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Build with Hive 0.13.1 doesn't have datanucleus and parquet dependencies.

2014-10-27 Thread Jianshi Huang
There's a change in build process lately for Hive 0.13 support and we
should make it obvious. Based on the new pom.xml I tried to enable Hive
0.13.1 support by using option

  -Phive-0.13.1

However, it seems datanucleus and parquet dependencies are not available in
the final build.

Am I missing anything?

Jianshi

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Build with Hive 0.13.1 doesn't have datanucleus and parquet dependencies.

2014-10-27 Thread Jianshi Huang
Ah I see. Thanks Hao! I'll wait for the fix.

Jianshi

On Mon, Oct 27, 2014 at 4:57 PM, Cheng, Hao  wrote:

> Hive-thriftserver module is not included while specifying the profile
> hive-0.13.1.
>
> -Original Message-
> From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> Sent: Monday, October 27, 2014 4:48 PM
> To: dev@spark.apache.org
> Subject: Build with Hive 0.13.1 doesn't have datanucleus and parquet
> dependencies.
>
> There's a change in build process lately for Hive 0.13 support and we
> should make it obvious. Based on the new pom.xml I tried to enable Hive
> 0.13.1 support by using option
>
>   -Phive-0.13.1
>
> However, it seems datanucleus and parquet dependencies are not available
> in the final build.
>
> Am I missing anything?
>
> Jianshi
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/