Hi Radha, I suggest you create a ticket with Hortonworks for this issue.
I believe the root cause is that the version of Phoenix they have provided doesn't include all of the necessary patches for Spark 1.6 DataFrame support. Good luck, Josh On Thu, May 12, 2016 at 3:11 AM, Radha krishna <grkmc...@gmail.com> wrote: > Hi All, > > I am using spark + phoenix combination, after loading the data(using and > spark+phoenix) I tried to perform some join operations and it is giving the > below error message. can some one suggest what is the solution for this > problem > > Hadoop Distribution : Hortonworks > Spark Version : 1.6 > Hbase Version: 1.1.2 > Phoenix Version: 4.4.0 > > Error > ======== > org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 > in stage 0.0 failed 1 times, most recent failure: Lost task 21.0 in stage > 0.0 (TID 21, localhost): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast > to org.apache.spark.sql.Row > at > org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) > at > org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) > at > org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) > at org.apache.spark.sql.DataFrame.org > $apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) > at org.apache.spark.sql.DataFrame.org > $apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) > at > org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) > at > org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) > at > org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) > at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) > at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) > at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:355) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:59) > at $iwC$$iwC$$iwC.<init>(<console>:61) > at $iwC$$iwC.<init>(<console>:63) > at $iwC.<init>(<console>:65) > at <init>(<console>:67) > at .<init>(<console>:71) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) > at > org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) > at > org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) > at org.apache.spark.repl.SparkILoop.org > $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.org > $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast > to org.apache.spark.sql.Row > at > org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Spark code > ========= > val sqlContext = new SQLContext(sc) > > val schemaString = "<Column Names>" > > // Generate the schema based on the string of schemaString > val schema = StructType(schemaString.split(" ").map(fieldName => > StructField(fieldName, StringType, true))) > > // Convert records of the RDD (people) to Rows. > val input_incr_rdd = > sc.textFile("Incremental_Data_File_Path").map(_.split("\u001c")).map(p => > Row(p(0), p(1).trim().toUpperCase(), p(2).trim().toUpperCase(), p(3), p(4), > p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), > p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), > p(26), p(27), p(28), p(29), p(30), p(31), p(32), p(33), p(34), p(35), > p(36), p(37), p(38), p(39), p(40), p(41), p(42), p(43), p(44), p(45), > p(46), p(47), p(48), p(49), p(50), p(51), p(52), p(53), p(54), p(55), > p(56), p(57), p(58), p(59), p(60), p(61), p(62), p(63), p(64), p(65), > p(66), p(67), p(68), p(69), p(70), p(71), p(72), p(73), p(74), p(75), > p(76), p(77), p(78), p(79), p(80), p(81), p(82), p(83), p(84), p(85), > p(86), p(87), p(88), p(89), p(90), p(91), p(92), p(93), p(94), p(95), > p(96), p(97), p(98), p(99), p(100), p(101), p(102))) > > // Apply the schema to the RDD. > val input_incr_rdd_df = sqlContext.createDataFrame(input_incr_rdd, > schema) > input_incr_rdd_df.registerTempTable("INCR_TABLE") > > val hist_hist_df = > sqlContext.read.format("org.apache.phoenix.spark").options(Map("table" -> > "Phoenix_Table_Name", "zkUrl" -> "g4t7565.houston.hp.com:2181 > :/hbase-unsecure")).load() > hist_hist_df.registerTempTable("HIST_TABLE") > > > val matched_rc = input_incr_rdd_df.join(hist_hist_df, > input_incr_rdd_df("Col1") <=> hist_hist_df("col1") > && input_incr_rdd_df("col2") <=> hist_hist_df("col2")) > > matched_rc.show() > > > > > Thanks & Regards > Radha krishna > > >