Its failing to sort because the columns are of Binary type (though maybe we
should support this as well).  Is this parquet data that was generated by
impala that you would expect to be a String?  If so turn on
spark.sql.parquet.binaryAsString
<http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration>
. Otherwise you could file a JIRA asking us to add support for sorting
binary data (or both :) ).

Michael

On Thu, Sep 18, 2014 at 9:31 AM, Paul Magid <paul_ma...@toyota.com> wrote:

>  All:
>
>
>
> I am putting Spark SQL 1.1 through its paces (in a POC) and have been
> pleasantly surprised with what can be done with such a young technology.
> I have run into an exception (listed below) that I suspect relates to the
> number of columns in the table I am querying.   There are 336 columns in
> the table.   I have included the Scala / Spark SQL I am running.  This
> Spark SQL code runs just fine when run against “narrower” tables.   Also,
> we have purpose built this POC cluster with lots of memory and we have set
> up Impala and Spark SQL with roughly the same amounts of memory.   There
> are 7 worker nodes with 20GB memory for Impala and Spark SQL each.  We are
> using Impala as a comparative benchmark and sanity check.  The equivalent
> SQL runs just fine in Impala (see below).   I am a bit of a noob and any
> help (even with the code below) is greatly appreciated.  Also, is there a
> document that lists current Spark SQL limitations/issues?
>
>
>
> Paul Magid
>
> Toyota Motor Sales IS Enterprise Architecture (EA)
>
> Architect I R&D
>
> Ph: 310-468-9091 (X69091)
>
> PCN 1C2970, Mail Drop PN12
>
>
>
>
>
> *Successful Result In Impala*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> +----------------+
>
> | marital_status |
>
> +----------------+
>
> | M              |
>
> | S              |
>
> | U              |
>
> | null           |
>
> +----------------+
>
> Returned 4 row(s) in 0.91s
>
>
>
> *Code*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> //Timer code
>
> def time[R](block: => R): R = {
>
>     val t0 = System.nanoTime()
>
>     val result = block    // call-by-name
>
>     val t1 = System.nanoTime()
>
>     println("Elapsed time: " + (t1 - t0).toFloat/1000000000 + "s")
>
>     result
>
> }
>
>
>
> //Declare and import SQLContext
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
>
>
> //Load Parquet file into a table
>
> val parquetFile_db2 =
> sqlContext.parquetFile("hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/")
>
> parquetFile_db2.registerAsTable("customer_demographic_pq")
>
>
>
> //Run SQL code with timer
>
> val records= time {sql("select marital_status from customer_demographic_pq
> group by marital_status order by marital_status
> ").collect().foreach(println)}
>
>
>
>
>
> *Exception*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> 14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at
> Exchange.scala:79, took 21.885859255 s
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
>
> Sort [marital_status#9 ASC], true
>
> Exchange (RangePartitioning [marital_status#9 ASC], 200)
>
>   Aggregate false, [marital_status#9], [marital_status#9]
>
>    Exchange (HashPartitioning [marital_status#9], 200)
>
>     Aggregate true, [marital_status#9], [marital_status#9]
>
>      ParquetTableScan [marital_status#9], (ParquetRelation
> hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/,
> Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
> mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
> hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []
>
>
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
>
>         at
> org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
>
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>
>         at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC.time(<console>:12)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
>
>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
>
>         at $iwC$$iwC$$iwC.<init>(<console>:28)
>
>         at $iwC$$iwC.<init>(<console>:30)
>
>         at $iwC.<init>(<console>:32)
>
>         at <init>(<console>:34)
>
>         at .<init>(<console>:38)
>
>         at .<clinit>(<console>)
>
>         at .<init>(<console>:7)
>
>         at .<clinit>(<console>)
>
>         at $print(<console>)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>
>         at java.lang.reflect.Method.invoke(Unknown Source)
>
>         at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>
>         at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>
>         at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>
>         at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
>
>         at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
>
>         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
>
>         at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
>
>         at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
>
>         at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
>
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
>
>         at org.apache.spark.repl.Main$.main(Main.scala:31)
>
>         at org.apache.spark.repl.Main.main(Main.scala)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>
>         at java.lang.reflect.Method.invoke(Unknown Source)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
>
> Exchange (RangePartitioning [marital_status#9 ASC], 200)
>
> Aggregate false, [marital_status#9], [marital_status#9]
>
>   Exchange (HashPartitioning [marital_status#9], 200)
>
>    Aggregate true, [marital_status#9], [marital_status#9]
>
>     ParquetTableScan [marital_status#9], (ParquetRelation
> hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/,
> Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
> mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
> hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []
>
>
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
>
>         at
> org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)
>
>         at
> org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192)
>
>         at
> org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193)
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
>
>         ... 49 more
>
> Caused by: scala.MatchError: BinaryType (of class
> org.apache.spark.sql.catalyst.types.BinaryType$)
>
>         at
> org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:256)
>
>         at
> org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:238)
>
>         at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
>
>         at java.util.TimSort.countRunAndMakeAscending(Unknown Source)
>
>         at java.util.TimSort.sort(Unknown Source)
>
>         at java.util.TimSort.sort(Unknown Source)
>
>         at java.util.Arrays.sort(Unknown Source)
>
>         at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
>
>         at scala.collection.AbstractSeq.sorted(Seq.scala:40)
>
>         at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
>
>         at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
>
>         at
> org.apache.spark.RangePartitioner$.determineBounds(Partitioner.scala:279)
>
>         at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:152)
>
>         at
> org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:79)
>
>         at
> org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
>
>         ... 53 more
>

Reply via email to