Re: How to do broadcast join in SparkSQL
Hi, Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support. I got the following exceptions: org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) Using the same DDL and Analyze script above. Jianshi On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang wrote: > It works fine, thanks for the help Michael. > > Liancheng also told me a trick, using a subquery with LIMIT n. It works in > latest 1.2.0 > > BTW, looks like the broadcast optimization won't be recognized if I do a > left join instead of a inner join. Is that true? How can I make it work for > left joins? > > Cheers, > Jianshi > > On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust > wrote: > >> Thanks for the input. We purposefully made sure that the config option >> did not make it into a release as it is not something that we are willing >> to support long term. That said we'll try and make this easier in the >> future either through hints or better support for statistics. >> >> In this particular case you can get what you want by registering the >> tables as external tables and setting an flag. Here's a helper function to >> do what you need. >> >> /** >> * Sugar for creating a Hive external table from a parquet path. >> */ >> def createParquetTable(name: String, file: String): Unit = { >> import org.apache.spark.sql.hive.HiveMetastoreTypes >> >> val rdd = parquetFile(file) >> val schema = rdd.schema.fields.map(f => s"${f.name} >> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") >> val ddl = s""" >> |CREATE EXTERNAL TABLE $name ( >> | $schema >> |) >> |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' >> |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' >> |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' >> |LOCATION '$file'""".stripMargin >> sql(ddl) >> setConf("spark.sql.hive.convertMetastoreParquet", "true") >> } >> >> You'll also need to run this to populate the statistics: >> >> ANALYZE TABLE tableName COMPUTE STATISTICS noscan; >> >> >> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang >> wrote: >> >>> Ok, currently there's cost-based optimization however Parquet statistics >>> is not implemented... >>> >>> What's the good way if I want to join a big fact table with several tiny >>> dimension tables in Spark SQL (1.1)? >>> >>> I wish we can allow user hint for the join. >>> >>> Jianshi >>> >>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang >>> wrote: >>> >>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not >>>> merged into master? >>>> >>>> I cannot find spark.sql.hints.broadcastTables in latest master, but >>>> it's in the following patch. >>>> >>>> >>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 >>>> >>>> >>>> Jianshi >>>> >>>> >>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang >>> > wrote: >>>> >>>>> Yes, looks like it can only be controlled by the >>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit >>>>> weird >>>>> to me. >>>>> >>>>> How am I suppose to know the exact bytes of a table? Let me specify >>>>> the join algorithm is preferred I think. >>>>> >>>>> Jianshi >>>>> >>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu wrote: >>>>> >>>>>> Have you looked at SPARK-1800 ? >>>>>> >>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala >>>>>> Cheers >>>>>> >>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang < >>>>>> jianshi.hu...@gmail.com> wrote: >>>>>> >>>>>>> I cannot find it in the documentation. And I have a dozen dimension >>>>>>> tables to (left) join... >>>>>>> >>>>>>> >>>>>>> Cheers, >>>>>>> -- >>>>>>> Jianshi Huang >>>>>>> >>>>>>> LinkedIn: jianshi >>>>>>> Twitter: @jshuang >>>>>>> Github & Blog: http://huangjs.github.com/ >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Jianshi Huang >>>>> >>>>> LinkedIn: jianshi >>>>> Twitter: @jshuang >>>>> Github & Blog: http://huangjs.github.com/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
Oh, I found a explanation from http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/ The error here is a bit misleading, what it really means is that the class parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive. Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as parquet-hive-1.0.jar Is it removed from latest Spark? Jianshi On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang wrote: > Hi, > > Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet > support. I got the following exceptions: > > org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must > implement HiveOutputFormat, otherwise it should be either > IgnoreKeyTextOutputFormat or SequenceFileOutputFormat > at > org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) > at > org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) > > Using the same DDL and Analyze script above. > > Jianshi > > > On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang > wrote: > >> It works fine, thanks for the help Michael. >> >> Liancheng also told me a trick, using a subquery with LIMIT n. It works >> in latest 1.2.0 >> >> BTW, looks like the broadcast optimization won't be recognized if I do a >> left join instead of a inner join. Is that true? How can I make it work for >> left joins? >> >> Cheers, >> Jianshi >> >> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust >> wrote: >> >>> Thanks for the input. We purposefully made sure that the config option >>> did not make it into a release as it is not something that we are willing >>> to support long term. That said we'll try and make this easier in the >>> future either through hints or better support for statistics. >>> >>> In this particular case you can get what you want by registering the >>> tables as external tables and setting an flag. Here's a helper function to >>> do what you need. >>> >>> /** >>> * Sugar for creating a Hive external table from a parquet path. >>> */ >>> def createParquetTable(name: String, file: String): Unit = { >>> import org.apache.spark.sql.hive.HiveMetastoreTypes >>> >>> val rdd = parquetFile(file) >>> val schema = rdd.schema.fields.map(f => s"${f.name} >>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") >>> val ddl = s""" >>> |CREATE EXTERNAL TABLE $name ( >>> | $schema >>> |) >>> |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' >>> |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' >>> |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' >>> |LOCATION '$file'""".stripMargin >>> sql(ddl) >>> setConf("spark.sql.hive.convertMetastoreParquet", "true") >>> } >>> >>> You'll also need to run this to populate the statistics: >>> >>> ANALYZE TABLE tableName COMPUTE STATISTICS noscan; >>> >>> >>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang >>> wrote: >>> >>>> Ok, currently there's cost-based optimization however Parquet >>>> statistics is not implemented... >>>> >>>> What's the good way if I want to join a big fact table with several >>>> tiny dimension tables in Spark SQL (1.1)? >>>> >>>> I wish we can allow user hint for the join. >>>> >>>> Jianshi >>>> >>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang >>>> wrote: >>>> >>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not >>>>> merged into master? >>>>> >>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but >>>>> it's in the following patch. >>>>> >>>>> >>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 >>>>> >>>>> >>>>> Jianshi >>>>> &g
Exception adding resource files in latest Spark
I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar -> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.(:9) at $iwC.(:18) at (:20) at .(:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang wrote: > I got the following error during Spark startup (Yarn-client mode): > > 14/12/04 19:33:58 INFO Client: Uploading resource > file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar > -> > hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar > java.lang.IllegalArgumentException: Wrong FS: > hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, > expected: file:/// > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) > at > org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) > at > org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) > at > org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) > at > org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) > at > org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) > at > org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) > at > org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) > at > org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) > at org.apache.spark.SparkContext.(SparkContext.scala:335) > at > org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) > at $iwC$$iwC.(:9) > at $iwC.(:18) > at (:20) > at .(:24) > > I'm using latest Spark built from master HEAD yesterday. Is this a bug? > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang wrote: > Looks like somehow Spark failed to find the core-site.xml in > /et/hadoop/conf > > I've already set the following env variables: > > export YARN_CONF_DIR=/etc/hadoop/conf > export HADOOP_CONF_DIR=/etc/hadoop/conf > export HBASE_CONF_DIR=/etc/hbase/conf > > Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? > > Jianshi > > On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang > wrote: > >> I got the following error during Spark startup (Yarn-client mode): >> >> 14/12/04 19:33:58 INFO Client: Uploading resource >> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar >> -> >> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar >> java.lang.IllegalArgumentException: Wrong FS: >> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, >> expected: file:/// >> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) >> at >> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) >> at >> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) >> at >> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) >> at >> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) >> at >> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) >> at >> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) >> at >> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) >> at >> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) >> at >> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) >> at >> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) >> at >> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) >> at >> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) >> at >> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) >> at >> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) >> at org.apache.spark.SparkContext.(SparkContext.scala:335) >> at >> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) >> at $iwC$$iwC.(:9) >> at $iwC.(:18) >> at (:20) >> at .(:24) >> >> I'm using latest Spark built from master HEAD yesterday. Is this a bug? >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang wrote: > Actually my HADOOP_CLASSPATH has already been set to include > /etc/hadoop/conf/* > > export > HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase > classpath) > > Jianshi > > On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang > wrote: > >> Looks like somehow Spark failed to find the core-site.xml in >> /et/hadoop/conf >> >> I've already set the following env variables: >> >> export YARN_CONF_DIR=/etc/hadoop/conf >> export HADOOP_CONF_DIR=/etc/hadoop/conf >> export HBASE_CONF_DIR=/etc/hbase/conf >> >> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? >> >> Jianshi >> >> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang >> wrote: >> >>> I got the following error during Spark startup (Yarn-client mode): >>> >>> 14/12/04 19:33:58 INFO Client: Uploading resource >>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar >>> -> >>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar >>> java.lang.IllegalArgumentException: Wrong FS: >>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, >>> expected: file:/// >>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) >>> at >>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) >>> at >>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) >>> at >>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) >>> at >>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) >>> at >>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) >>> at >>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) >>> at >>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) >>> at >>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) >>> at >>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) >>> at >>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) >>> at >>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) >>> at >>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) >>> at >>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) >>> at >>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) >>> at org.apache.spark.SparkContext.(SparkContext.scala:335) >>> at >>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) >>> at $iwC$$iwC.(:9) >>> at $iwC.(:18) >>> at (:20) >>> at .(:24) >>> >>> I'm using latest Spark built from master HEAD yesterday. Is this a bug? >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Correction: According to Liancheng, this hotfix might be the root cause: https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce Jianshi On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang wrote: > Looks like the datanucleus*.jar shouldn't appear in the hdfs path in > Yarn-client mode. > > Maybe this patch broke yarn-client. > > > https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 > > Jianshi > > On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang > wrote: > >> Actually my HADOOP_CLASSPATH has already been set to include >> /etc/hadoop/conf/* >> >> export >> HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase >> classpath) >> >> Jianshi >> >> On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang >> wrote: >> >>> Looks like somehow Spark failed to find the core-site.xml in >>> /et/hadoop/conf >>> >>> I've already set the following env variables: >>> >>> export YARN_CONF_DIR=/etc/hadoop/conf >>> export HADOOP_CONF_DIR=/etc/hadoop/conf >>> export HBASE_CONF_DIR=/etc/hbase/conf >>> >>> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? >>> >>> Jianshi >>> >>> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang >>> wrote: >>> >>>> I got the following error during Spark startup (Yarn-client mode): >>>> >>>> 14/12/04 19:33:58 INFO Client: Uploading resource >>>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar >>>> -> >>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar >>>> java.lang.IllegalArgumentException: Wrong FS: >>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, >>>> expected: file:/// >>>> at >>>> org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) >>>> at >>>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) >>>> at >>>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) >>>> at >>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) >>>> at >>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) >>>> at >>>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) >>>> at >>>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) >>>> at >>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) >>>> at >>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) >>>> at scala.Option.foreach(Option.scala:236) >>>> at >>>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) >>>> at >>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) >>>> at >>>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) >>>> at >>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) >>>> at >>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) >>>> at >>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) >>>> at >>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) >>>> at org.apache.spark.SparkContext.(SparkContext.scala:335) >>>> at >>>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) >>>> at $iwC$$iwC.(:9) >>>> at $iwC.(:18) >>>> at (:20) >>>> at .(:24) >>>> >>>> I'm using latest Spark built from master HEAD yesterday. Is this a bug? >>>> >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
I created a ticket for this: https://issues.apache.org/jira/browse/SPARK-4757 Jianshi On Fri, Dec 5, 2014 at 1:31 PM, Jianshi Huang wrote: > Correction: > > According to Liancheng, this hotfix might be the root cause: > > > https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce > > Jianshi > > On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang > wrote: > >> Looks like the datanucleus*.jar shouldn't appear in the hdfs path in >> Yarn-client mode. >> >> Maybe this patch broke yarn-client. >> >> >> https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 >> >> Jianshi >> >> On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang >> wrote: >> >>> Actually my HADOOP_CLASSPATH has already been set to include >>> /etc/hadoop/conf/* >>> >>> export >>> HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase >>> classpath) >>> >>> Jianshi >>> >>> On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang >>> wrote: >>> >>>> Looks like somehow Spark failed to find the core-site.xml in >>>> /et/hadoop/conf >>>> >>>> I've already set the following env variables: >>>> >>>> export YARN_CONF_DIR=/etc/hadoop/conf >>>> export HADOOP_CONF_DIR=/etc/hadoop/conf >>>> export HBASE_CONF_DIR=/etc/hbase/conf >>>> >>>> Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? >>>> >>>> Jianshi >>>> >>>> On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang >>> > wrote: >>>> >>>>> I got the following error during Spark startup (Yarn-client mode): >>>>> >>>>> 14/12/04 19:33:58 INFO Client: Uploading resource >>>>> file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar >>>>> -> >>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar >>>>> java.lang.IllegalArgumentException: Wrong FS: >>>>> hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, >>>>> expected: file:/// >>>>> at >>>>> org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) >>>>> at >>>>> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) >>>>> at >>>>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) >>>>> at >>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) >>>>> at >>>>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) >>>>> at >>>>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) >>>>> at >>>>> org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) >>>>> at >>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) >>>>> at >>>>> org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) >>>>> at scala.Option.foreach(Option.scala:236) >>>>> at >>>>> org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) >>>>> at >>>>> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) >>>>> at >>>>> org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) >>>>> at >>>>> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) >>>>> at >>>>> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) >>>>> at >>>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) >>>>> at >>>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) >>>>> at org.apache.spark.SparkContext.(SparkContext.scala:335) >>>>> at >>>>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) >>>>> at $iwC$$iwC.(:9) >>>>> at $iwC.(:18) >>>>> at (:20) >>>>> at .(:24) >>>>> >>>>> I'm using latest Spark built from master HEAD yesterday. Is this a bug? >>>>> >>>>> -- >>>>> Jianshi Huang >>>>> >>>>> LinkedIn: jianshi >>>>> Twitter: @jshuang >>>>> Github & Blog: http://huangjs.github.com/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao wrote: > Hi Jianshi, > > I couldn’t reproduce that with latest MASTER, and I can always get the > BroadcastHashJoin for managed tables (in .csv file) in my testing, are > there any external tables in your case? > > > > In general probably couple of things you can try first (with HiveContext): > > 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all > of the tables); > > 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold > as a greater value, it is 1024*1024*10 by default, just make sure the > maximum dimension tables size (in bytes) is less than this) > > 3) Always put the main table(the biggest table) in the left-most > among the inner joins; > > > > DESC EXTENDED tablename; -- this will print the detail information for the > statistic table size (the field “totalSize”) > > EXPLAIN EXTENDED query; -- this will print the detail physical plan. > > > > Let me know if you still have problem. > > > > Hao > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Thursday, November 27, 2014 10:24 PM > *To:* Cheng, Hao > *Cc:* user > *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark > > > > Hi Hao, > > > > I'm using inner join as Broadcast join didn't work for left joins (thanks > for the links for the latest improvements). > > > > And I'm using HiveConext and it worked in a previous build (10/12) when > joining 15 dimension tables. > > > > Jianshi > > > > On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao wrote: > > Are all of your join keys the same? and I guess the join type are all > “Left” join, https://github.com/apache/spark/pull/3362 probably is what > you need. > > > > And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast > join) currently, https://github.com/apache/spark/pull/3270 should be > another optimization for this. > > > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Wednesday, November 26, 2014 4:36 PM > *To:* user > *Subject:* Auto BroadcastJoin optimization failed in latest Spark > > > > Hi, > > > > I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails > optimizing auto broadcast join in my query. I have a query that joins a > huge fact table with 15 tiny dimension tables. > > > > I'm currently using an older version of Spark which was built on Oct. 12. > > > > Anyone else has met similar situation? > > > > -- > > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > > > > > > -- > > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
drop table if exists throws exception
Hi, I got exception saying Hive: NoSuchObjectException(message: table not found) when running "DROP TABLE IF EXISTS " Looks like a new regression in Hive module. Anyone can confirm this? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
If I run ANALYZE without NOSCAN, then Hive can successfully get the size: parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} Is Hive's PARQUET support broken? Jianshi On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang wrote: > Sorry for the late of follow-up. > > I used Hao's DESC EXTENDED command and found some clue: > > new (broadcast broken Spark build): > parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, > COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} > > old (broadcast working Spark build): > parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, > totalSize=56166} > > Looks like the table size computation failed in the latest version. > > I've run the analyze command: > > ANALYZE TABLE $table COMPUTE STATISTICS noscan > > And the tables are created from Parquet files: > > e.g. > CREATE EXTERNAL TABLE table1 ( > code int, > desc string > ) > STORED AS PARQUET > LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' > > > Anyone knows what went wrong? > > > Thanks, > Jianshi > > > > On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao wrote: > >> Hi Jianshi, >> >> I couldn’t reproduce that with latest MASTER, and I can always get the >> BroadcastHashJoin for managed tables (in .csv file) in my testing, are >> there any external tables in your case? >> >> >> >> In general probably couple of things you can try first (with HiveContext): >> >> 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all >> of the tables); >> >> 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold >> as a greater value, it is 1024*1024*10 by default, just make sure the >> maximum dimension tables size (in bytes) is less than this) >> >> 3) Always put the main table(the biggest table) in the left-most >> among the inner joins; >> >> >> >> DESC EXTENDED tablename; -- this will print the detail information for >> the statistic table size (the field “totalSize”) >> >> EXPLAIN EXTENDED query; -- this will print the detail physical plan. >> >> >> >> Let me know if you still have problem. >> >> >> >> Hao >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Thursday, November 27, 2014 10:24 PM >> *To:* Cheng, Hao >> *Cc:* user >> *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark >> >> >> >> Hi Hao, >> >> >> >> I'm using inner join as Broadcast join didn't work for left joins (thanks >> for the links for the latest improvements). >> >> >> >> And I'm using HiveConext and it worked in a previous build (10/12) when >> joining 15 dimension tables. >> >> >> >> Jianshi >> >> >> >> On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao wrote: >> >> Are all of your join keys the same? and I guess the join type are all >> “Left” join, https://github.com/apache/spark/pull/3362 probably is what >> you need. >> >> >> >> And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast >> join) currently, https://github.com/apache/spark/pull/3270 should be >> another optimization for this. >> >> >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Wednesday, November 26, 2014 4:36 PM >> *To:* user >> *Subject:* Auto BroadcastJoin optimization failed in latest Spark >> >> >> >> Hi, >> >> >> >> I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 >> fails optimizing auto broadcast join in my query. I have a query that joins >> a huge fact table with 15 tiny dimension tables. >> >> >> >> I'm currently using an older version of Spark which was built on Oct. 12. >> >> >> >> Anyone else has met similar situation? >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
With Liancheng's suggestion, I've tried setting spark.sql.hive.convertMetastoreParquet false but still analyze noscan return -1 in rawDataSize Jianshi On Fri, Dec 5, 2014 at 3:33 PM, Jianshi Huang wrote: > If I run ANALYZE without NOSCAN, then Hive can successfully get the size: > > parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, > COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} > > Is Hive's PARQUET support broken? > > Jianshi > > > On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang > wrote: > >> Sorry for the late of follow-up. >> >> I used Hao's DESC EXTENDED command and found some clue: >> >> new (broadcast broken Spark build): >> parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, >> COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} >> >> old (broadcast working Spark build): >> parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, >> totalSize=56166} >> >> Looks like the table size computation failed in the latest version. >> >> I've run the analyze command: >> >> ANALYZE TABLE $table COMPUTE STATISTICS noscan >> >> And the tables are created from Parquet files: >> >> e.g. >> CREATE EXTERNAL TABLE table1 ( >> code int, >> desc string >> ) >> STORED AS PARQUET >> LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' >> >> >> Anyone knows what went wrong? >> >> >> Thanks, >> Jianshi >> >> >> >> On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao wrote: >> >>> Hi Jianshi, >>> >>> I couldn’t reproduce that with latest MASTER, and I can always get the >>> BroadcastHashJoin for managed tables (in .csv file) in my testing, are >>> there any external tables in your case? >>> >>> >>> >>> In general probably couple of things you can try first (with >>> HiveContext): >>> >>> 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all >>> of the tables); >>> >>> 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the >>> threshold as a greater value, it is 1024*1024*10 by default, just make sure >>> the maximum dimension tables size (in bytes) is less than this) >>> >>> 3) Always put the main table(the biggest table) in the left-most >>> among the inner joins; >>> >>> >>> >>> DESC EXTENDED tablename; -- this will print the detail information for >>> the statistic table size (the field “totalSize”) >>> >>> EXPLAIN EXTENDED query; -- this will print the detail physical plan. >>> >>> >>> >>> Let me know if you still have problem. >>> >>> >>> >>> Hao >>> >>> >>> >>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >>> *Sent:* Thursday, November 27, 2014 10:24 PM >>> *To:* Cheng, Hao >>> *Cc:* user >>> *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark >>> >>> >>> >>> Hi Hao, >>> >>> >>> >>> I'm using inner join as Broadcast join didn't work for left joins >>> (thanks for the links for the latest improvements). >>> >>> >>> >>> And I'm using HiveConext and it worked in a previous build (10/12) when >>> joining 15 dimension tables. >>> >>> >>> >>> Jianshi >>> >>> >>> >>> On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao wrote: >>> >>> Are all of your join keys the same? and I guess the join type are all >>> “Left” join, https://github.com/apache/spark/pull/3362 probably is what >>> you need. >>> >>> >>> >>> And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast >>> join) currently, https://github.com/apache/spark/pull/3270 should be >>> another optimization for this. >>> >>> >>> >>> >>> >>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >>> *Sent:* Wednesday, November 26, 2014 4:36 PM >>> *To:* user >>> *Subject:* Auto BroadcastJoin optimization failed in latest Spark >>> >>> >>> >>> Hi, >>> >>> >>> >>> I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 >>> fails optimizing auto broadcast join in my query. I have a query that joins >>> a huge fact table with 15 tiny dimension tables. >>> >>> >>> >>> I'm currently using an older version of Spark which was built on Oct. 12. >>> >>> >>> >>> Anyone else has met similar situation? >>> >>> >>> >>> -- >>> >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >>> >>> >>> >>> >>> -- >>> >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: drop table if exists throws exception
I see. The resulting SchemaRDD is returned so like Michael said, the exception does not propogate to user code. However printing out the following log is confusing :) scala> sql("drop table if exists abc") 14/12/05 16:27:02 INFO ParseDriver: Parsing command: drop table if exists abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO Driver: Concurrency mode is disabled, not creating a lock manager 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO ParseDriver: Parsing command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 INFO Driver: Semantic Analysis Completed 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO Driver: Starting command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO PerfLogger: 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 ERROR Hive: NoSuchObjectException(message:default.abc table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) On Sat, Dec 6, 2014 at 3:45 AM, Mark Hamstra wrote: > And that is no different from how Hive has worked for a long time. > > On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust > wrote: > >> The command run fine for me on master. Note that Hive does print an >> exception in the logs, but that exception does not propogate to user code. >> >> On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang >> wrote: >> >> > Hi, >> > >> > I got exception saying Hive: NoSuchObjectException(message: table >> > not found) >> > >> > when running "DROP TABLE IF EXISTS " >> > >> > Looks like a new regression in Hive module. >> > >> > Anyone can confirm this? >> > >> > Thanks, >> > -- >> > Jianshi Huang >> > >> > LinkedIn: jianshi >> > Twitter: @jshuang >> > Github & Blog: http://huangjs.github.com/ >> > >> > > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Ok, found another possible bug in Hive. My current solution is to use ALTER TABLE CHANGE to rename the column names. The problem is after renaming the column names, the value of the columns became all NULL. Before renaming: scala> sql("select `sorted::cre_ts` from pmt limit 1").collect res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) Execute renaming: scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string") res13: org.apache.spark.sql.SchemaRDD = SchemaRDD[972] at RDD at SchemaRDD.scala:108 == Query Plan == After renaming: scala> sql("select cre_ts from pmt limit 1").collect res16: Array[org.apache.spark.sql.Row] = Array([null]) I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-4781 Jianshi On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang wrote: > Hmm... another issue I found doing this approach is that ANALYZE TABLE ... > COMPUTE STATISTICS will fail to attach the metadata to the table, and later > broadcast join and such will fail... > > Any idea how to fix this issue? > > Jianshi > > On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang > wrote: > >> Very interesting, the line doing drop table will throws an exception. >> After removing it all works. >> >> Jianshi >> >> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang >> wrote: >> >>> Here's the solution I got after talking with Liancheng: >>> >>> 1) using backquote `..` to wrap up all illegal characters >>> >>> val rdd = parquetFile(file) >>> val schema = rdd.schema.fields.map(f => s"`${f.name}` >>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") >>> >>> val ddl_13 = s""" >>> |CREATE EXTERNAL TABLE $name ( >>> | $schema >>> |) >>> |STORED AS PARQUET >>> |LOCATION '$file' >>> """.stripMargin >>> >>> sql(ddl_13) >>> >>> 2) create a new Schema and do applySchema to generate a new SchemaRDD, >>> had to drop and register table >>> >>> val t = table(name) >>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name = >>> s.name.replaceAll(".*?::", "" >>> sql(s"drop table $name") >>> applySchema(t, newSchema).registerTempTable(name) >>> >>> I'm testing it for now. >>> >>> Thanks for the help! >>> >>> >>> Jianshi >>> >>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang >>> wrote: >>> >>>> Hi, >>>> >>>> I had to use Pig for some preprocessing and to generate Parquet files >>>> for Spark to consume. >>>> >>>> However, due to Pig's limitation, the generated schema contains Pig's >>>> identifier >>>> >>>> e.g. >>>> sorted::id, sorted::cre_ts, ... >>>> >>>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. >>>> >>>> create external table pmt ( >>>> sorted::id bigint >>>> ) >>>> stored as parquet >>>> location '...' >>>> >>>> Obviously it didn't work, I also tried removing the identifier >>>> sorted::, but the resulting rows contain only nulls. >>>> >>>> Any idea how to create a table in HiveContext from these Parquet files? >>>> >>>> Thanks, >>>> Jianshi >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Ah... I see. Thanks for pointing it out. Then it means we cannot mount external table using customized column names. hmm... Then the only option left is to use a subquery to add a bunch of column alias. I'll try it later. Thanks, Jianshi On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust wrote: > This is by hive's design. From the Hive documentation: > > The column change command will only modify Hive's metadata, and will not >> modify data. Users should make sure the actual data layout of the >> table/partition conforms with the metadata definition. > > > > On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang > wrote: > >> Ok, found another possible bug in Hive. >> >> My current solution is to use ALTER TABLE CHANGE to rename the column >> names. >> >> The problem is after renaming the column names, the value of the columns >> became all NULL. >> >> Before renaming: >> scala> sql("select `sorted::cre_ts` from pmt limit 1").collect >> res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) >> >> Execute renaming: >> scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string") >> res13: org.apache.spark.sql.SchemaRDD = >> SchemaRDD[972] at RDD at SchemaRDD.scala:108 >> == Query Plan == >> >> >> After renaming: >> scala> sql("select cre_ts from pmt limit 1").collect >> res16: Array[org.apache.spark.sql.Row] = Array([null]) >> >> I created a JIRA for it: >> >> https://issues.apache.org/jira/browse/SPARK-4781 >> >> >> Jianshi >> >> On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang >> wrote: >> >>> Hmm... another issue I found doing this approach is that ANALYZE TABLE >>> ... COMPUTE STATISTICS will fail to attach the metadata to the table, and >>> later broadcast join and such will fail... >>> >>> Any idea how to fix this issue? >>> >>> Jianshi >>> >>> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang >>> wrote: >>> >>>> Very interesting, the line doing drop table will throws an exception. >>>> After removing it all works. >>>> >>>> Jianshi >>>> >>>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang >>>> wrote: >>>> >>>>> Here's the solution I got after talking with Liancheng: >>>>> >>>>> 1) using backquote `..` to wrap up all illegal characters >>>>> >>>>> val rdd = parquetFile(file) >>>>> val schema = rdd.schema.fields.map(f => s"`${f.name}` >>>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") >>>>> >>>>> val ddl_13 = s""" >>>>> |CREATE EXTERNAL TABLE $name ( >>>>> | $schema >>>>> |) >>>>> |STORED AS PARQUET >>>>> |LOCATION '$file' >>>>> """.stripMargin >>>>> >>>>> sql(ddl_13) >>>>> >>>>> 2) create a new Schema and do applySchema to generate a new SchemaRDD, >>>>> had to drop and register table >>>>> >>>>> val t = table(name) >>>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name = >>>>> s.name.replaceAll(".*?::", "" >>>>> sql(s"drop table $name") >>>>> applySchema(t, newSchema).registerTempTable(name) >>>>> >>>>> I'm testing it for now. >>>>> >>>>> Thanks for the help! >>>>> >>>>> >>>>> Jianshi >>>>> >>>>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang >>>> > wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I had to use Pig for some preprocessing and to generate Parquet files >>>>>> for Spark to consume. >>>>>> >>>>>> However, due to Pig's limitation, the generated schema contains Pig's >>>>>> identifier >>>>>> >>>>>> e.g. >>>>>> sorted::id, sorted::cre_ts, ... >>>>>> >>>>>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. >>>>>> >>>>>> create external table pmt ( >>>>>> sorted::id bigint >>>>>> ) >>>>>> stored as parquet >>>>>> location '...' >>>>>> >>>>>> Obviously it didn't work, I also tried removing the identifier >>>>>> sorted::, but the resulting rows contain only nulls. >>>>>> >>>>>> Any idea how to create a table in HiveContext from these Parquet >>>>>> files? >>>>>> >>>>>> Thanks, >>>>>> Jianshi >>>>>> -- >>>>>> Jianshi Huang >>>>>> >>>>>> LinkedIn: jianshi >>>>>> Twitter: @jshuang >>>>>> Github & Blog: http://huangjs.github.com/ >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Jianshi Huang >>>>> >>>>> LinkedIn: jianshi >>>>> Twitter: @jshuang >>>>> Github & Blog: http://huangjs.github.com/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
FYI, Latest hive 0.14/parquet will have column renaming support. Jianshi On Wed, Dec 10, 2014 at 3:37 AM, Michael Armbrust wrote: > You might also try out the recently added support for views. > > On Mon, Dec 8, 2014 at 9:31 PM, Jianshi Huang > wrote: > >> Ah... I see. Thanks for pointing it out. >> >> Then it means we cannot mount external table using customized column >> names. hmm... >> >> Then the only option left is to use a subquery to add a bunch of column >> alias. I'll try it later. >> >> Thanks, >> Jianshi >> >> On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust >> wrote: >> >>> This is by hive's design. From the Hive documentation: >>> >>> The column change command will only modify Hive's metadata, and will not >>>> modify data. Users should make sure the actual data layout of the >>>> table/partition conforms with the metadata definition. >>> >>> >>> >>> On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang >>> wrote: >>> >>>> Ok, found another possible bug in Hive. >>>> >>>> My current solution is to use ALTER TABLE CHANGE to rename the column >>>> names. >>>> >>>> The problem is after renaming the column names, the value of the >>>> columns became all NULL. >>>> >>>> Before renaming: >>>> scala> sql("select `sorted::cre_ts` from pmt limit 1").collect >>>> res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) >>>> >>>> Execute renaming: >>>> scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string") >>>> res13: org.apache.spark.sql.SchemaRDD = >>>> SchemaRDD[972] at RDD at SchemaRDD.scala:108 >>>> == Query Plan == >>>> >>>> >>>> After renaming: >>>> scala> sql("select cre_ts from pmt limit 1").collect >>>> res16: Array[org.apache.spark.sql.Row] = Array([null]) >>>> >>>> I created a JIRA for it: >>>> >>>> https://issues.apache.org/jira/browse/SPARK-4781 >>>> >>>> >>>> Jianshi >>>> >>>> On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang >>>> wrote: >>>> >>>>> Hmm... another issue I found doing this approach is that ANALYZE TABLE >>>>> ... COMPUTE STATISTICS will fail to attach the metadata to the table, and >>>>> later broadcast join and such will fail... >>>>> >>>>> Any idea how to fix this issue? >>>>> >>>>> Jianshi >>>>> >>>>> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang >>>> > wrote: >>>>> >>>>>> Very interesting, the line doing drop table will throws an exception. >>>>>> After removing it all works. >>>>>> >>>>>> Jianshi >>>>>> >>>>>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang < >>>>>> jianshi.hu...@gmail.com> wrote: >>>>>> >>>>>>> Here's the solution I got after talking with Liancheng: >>>>>>> >>>>>>> 1) using backquote `..` to wrap up all illegal characters >>>>>>> >>>>>>> val rdd = parquetFile(file) >>>>>>> val schema = rdd.schema.fields.map(f => s"`${f.name}` >>>>>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") >>>>>>> >>>>>>> val ddl_13 = s""" >>>>>>> |CREATE EXTERNAL TABLE $name ( >>>>>>> | $schema >>>>>>> |) >>>>>>> |STORED AS PARQUET >>>>>>> |LOCATION '$file' >>>>>>> """.stripMargin >>>>>>> >>>>>>> sql(ddl_13) >>>>>>> >>>>>>> 2) create a new Schema and do applySchema to generate a new >>>>>>> SchemaRDD, had to drop and register table >>>>>>> >>>>>>> val t = table(name) >>>>>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name >>>>>>> = s.name.replaceAll(".*?::", "" >>>>>>> sql(s&
Re: How to do broadcast join in SparkSQL
Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang wrote: > Yes, looks like it can only be controlled by the > parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird > to me. > > How am I suppose to know the exact bytes of a table? Let me specify the > join algorithm is preferred I think. > > Jianshi > > On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu wrote: > >> Have you looked at SPARK-1800 ? >> >> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala >> Cheers >> >> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang >> wrote: >> >>> I cannot find it in the documentation. And I have a dozen dimension >>> tables to (left) join... >>> >>> >>> Cheers, >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang wrote: > Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged > into master? > > I cannot find spark.sql.hints.broadcastTables in latest master, but it's > in the following patch. > > > https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 > > > Jianshi > > > On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang > wrote: > >> Yes, looks like it can only be controlled by the >> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird >> to me. >> >> How am I suppose to know the exact bytes of a table? Let me specify the >> join algorithm is preferred I think. >> >> Jianshi >> >> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu wrote: >> >>> Have you looked at SPARK-1800 ? >>> >>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala >>> Cheers >>> >>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang >>> wrote: >>> >>>> I cannot find it in the documentation. And I have a dozen dimension >>>> tables to (left) join... >>>> >>>> >>>> Cheers, >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
It works fine, thanks for the help Michael. Liancheng also told me a trick, using a subquery with LIMIT n. It works in latest 1.2.0 BTW, looks like the broadcast optimization won't be recognized if I do a left join instead of a inner join. Is that true? How can I make it work for left joins? Cheers, Jianshi On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust wrote: > Thanks for the input. We purposefully made sure that the config option > did not make it into a release as it is not something that we are willing > to support long term. That said we'll try and make this easier in the > future either through hints or better support for statistics. > > In this particular case you can get what you want by registering the > tables as external tables and setting an flag. Here's a helper function to > do what you need. > > /** > * Sugar for creating a Hive external table from a parquet path. > */ > def createParquetTable(name: String, file: String): Unit = { > import org.apache.spark.sql.hive.HiveMetastoreTypes > > val rdd = parquetFile(file) > val schema = rdd.schema.fields.map(f => s"${f.name} > ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") > val ddl = s""" > |CREATE EXTERNAL TABLE $name ( > | $schema > |) > |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' > |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' > |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' > |LOCATION '$file'""".stripMargin > sql(ddl) > setConf("spark.sql.hive.convertMetastoreParquet", "true") > } > > You'll also need to run this to populate the statistics: > > ANALYZE TABLE tableName COMPUTE STATISTICS noscan; > > > On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang > wrote: > >> Ok, currently there's cost-based optimization however Parquet statistics >> is not implemented... >> >> What's the good way if I want to join a big fact table with several tiny >> dimension tables in Spark SQL (1.1)? >> >> I wish we can allow user hint for the join. >> >> Jianshi >> >> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang >> wrote: >> >>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not >>> merged into master? >>> >>> I cannot find spark.sql.hints.broadcastTables in latest master, but >>> it's in the following patch. >>> >>> >>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 >>> >>> >>> Jianshi >>> >>> >>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang >>> wrote: >>> >>>> Yes, looks like it can only be controlled by the >>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird >>>> to me. >>>> >>>> How am I suppose to know the exact bytes of a table? Let me specify the >>>> join algorithm is preferred I think. >>>> >>>> Jianshi >>>> >>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu wrote: >>>> >>>>> Have you looked at SPARK-1800 ? >>>>> >>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala >>>>> Cheers >>>>> >>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang < >>>>> jianshi.hu...@gmail.com> wrote: >>>>> >>>>>> I cannot find it in the documentation. And I have a dozen dimension >>>>>> tables to (left) join... >>>>>> >>>>>> >>>>>> Cheers, >>>>>> -- >>>>>> Jianshi Huang >>>>>> >>>>>> LinkedIn: jianshi >>>>>> Twitter: @jshuang >>>>>> Github & Blog: http://huangjs.github.com/ >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jianshi Huang >>>> >>>> LinkedIn: jianshi >>>> Twitter: @jshuang >>>> Github & Blog: http://huangjs.github.com/ >>>> >>> >>> >>> >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
SPARK-3106 fixed?
https://issues.apache.org/jira/browse/SPARK-3106 I'm having the saming errors described in SPARK-3106 (no other types of errors confirmed), running a bunch sql queries on spark 1.2.0 built from latest master HEAD. Any updates to this issue? My main task is to join a huge fact table with a dozen dim tables (using HiveContext) and then map it to my class object. It failed a couple of times and now I cached the intermediate table and currently it seems working fine... no idea why until I found SPARK-3106 Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
Hmm... it failed again, just lasted a little bit longer. Jianshi On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang wrote: > https://issues.apache.org/jira/browse/SPARK-3106 > > I'm having the saming errors described in SPARK-3106 (no other types of > errors confirmed), running a bunch sql queries on spark 1.2.0 built from > latest master HEAD. > > Any updates to this issue? > > My main task is to join a huge fact table with a dozen dim tables (using > HiveContext) and then map it to my class object. It failed a couple of > times and now I cached the intermediate table and currently it seems > working fine... no idea why until I found SPARK-3106 > > Cheers, > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
Turned out it was caused by this issue: https://issues.apache.org/jira/browse/SPARK-3923 Set spark.akka.heartbeat.interval to 100 solved it. Jianshi On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang wrote: > Hmm... it failed again, just lasted a little bit longer. > > Jianshi > > On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang > wrote: > >> https://issues.apache.org/jira/browse/SPARK-3106 >> >> I'm having the saming errors described in SPARK-3106 (no other types of >> errors confirmed), running a bunch sql queries on spark 1.2.0 built from >> latest master HEAD. >> >> Any updates to this issue? >> >> My main task is to join a huge fact table with a dozen dim tables (using >> HiveContext) and then map it to my class object. It failed a couple of >> times and now I cached the intermediate table and currently it seems >> working fine... no idea why until I found SPARK-3106 >> >> Cheers, >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
One thing made me very confused during debuggin is the error message. The important one WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@xxx:50278] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. is of Log Level WARN. Jianshi On Tue, Oct 14, 2014 at 4:36 AM, Jianshi Huang wrote: > Turned out it was caused by this issue: > https://issues.apache.org/jira/browse/SPARK-3923 > > Set spark.akka.heartbeat.interval to 100 solved it. > > Jianshi > > On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang > wrote: > >> Hmm... it failed again, just lasted a little bit longer. >> >> Jianshi >> >> On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang >> wrote: >> >>> https://issues.apache.org/jira/browse/SPARK-3106 >>> >>> I'm having the saming errors described in SPARK-3106 (no other types of >>> errors confirmed), running a bunch sql queries on spark 1.2.0 built from >>> latest master HEAD. >>> >>> Any updates to this issue? >>> >>> My main task is to join a huge fact table with a dozen dim tables (using >>> HiveContext) and then map it to my class object. It failed a couple of >>> times and now I cached the intermediate table and currently it seems >>> working fine... no idea why until I found SPARK-3106 >>> >>> Cheers, >>> -- >>> Jianshi Huang >>> >>> LinkedIn: jianshi >>> Twitter: @jshuang >>> Github & Blog: http://huangjs.github.com/ >>> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Multitenancy in Spark - within/across spark context
Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin wrote: > On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar > wrote: > >> That's not something you might want to do usually. In general, a > >> SparkContext maps to a user application > > > > My question was basically this. In this page in the official doc, under > > "Scheduling within an application" section, it talks about multiuser and > > fair sharing within an app. How does multiuser within an application > > work(how users connect to an app,run their stuff) ? When would I want to > use > > this ? > > I see. The way I read that page is that Spark supports all those > scheduling options; but Spark doesn't give you the means to actually > be able to submit jobs from different users to a running SparkContext > hosted on a different process. For that, you'll need something like > the job server that I referenced before, or write your own framework > for supporting that. > > Personally, I'd use the information on that page when dealing with > concurrent jobs in the same SparkContext, but still restricted to the > same user. I'd avoid trying to create any application where a single > SparkContext is trying to be shared by multiple users in any way. > > >> As far as I understand, this will cause executors to be killed, which > >> means that Spark will start retrying tasks to rebuild the data that > >> was held by those executors when needed. > > > > I basically wanted to find out if there were any "gotchas" related to > > preemption on Spark. Things like say half of an application's executors > got > > preempted say while doing reduceByKey, will the application progress with > > the remaining resources/fair share ? > > Jobs should still make progress as long as at least one executor is > available. The gotcha would be the one I mentioned, where Spark will > fail your job after "x" executors failed, which might be a common > occurrence when preemption is enabled. That being said, it's a > configurable option, so you can set "x" to a very large value and your > job should keep on chugging along. > > The options you'd want to take a look at are: spark.task.maxFailures > and spark.yarn.max.executor.failures > > -- > Marcelo > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Build with Hive 0.13.1 doesn't have datanucleus and parquet dependencies.
There's a change in build process lately for Hive 0.13 support and we should make it obvious. Based on the new pom.xml I tried to enable Hive 0.13.1 support by using option -Phive-0.13.1 However, it seems datanucleus and parquet dependencies are not available in the final build. Am I missing anything? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Re: Build with Hive 0.13.1 doesn't have datanucleus and parquet dependencies.
Ah I see. Thanks Hao! I'll wait for the fix. Jianshi On Mon, Oct 27, 2014 at 4:57 PM, Cheng, Hao wrote: > Hive-thriftserver module is not included while specifying the profile > hive-0.13.1. > > -Original Message- > From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] > Sent: Monday, October 27, 2014 4:48 PM > To: dev@spark.apache.org > Subject: Build with Hive 0.13.1 doesn't have datanucleus and parquet > dependencies. > > There's a change in build process lately for Hive 0.13 support and we > should make it obvious. Based on the new pom.xml I tried to enable Hive > 0.13.1 support by using option > > -Phive-0.13.1 > > However, it seems datanucleus and parquet dependencies are not available > in the final build. > > Am I missing anything? > > Jianshi > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/