Re: Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query
The issue is now resolved. One of the csv files had an incorrect record at the end. On Fri, Feb 27, 2015 at 4:24 PM, anamika gupta anamika.guo...@gmail.com wrote: I have three tables with the following schema: case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp, DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR: Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String, MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int) case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int, MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int, VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int, SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME: java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp, INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME: java.sql.Timestamp) class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{ @throws(classOf[IndexOutOfBoundsException]) override def productElement(n: Int) = n match { case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 = ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID; case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 = PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 = IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 = ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 = TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS; case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 = LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString()) } override def productArity: Int = 29; override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d] } Non-join queries work fine: *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID), COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR, DAY_OF_YEAR)* res4: Array[org.apache.spark.sql.Row] = Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1], [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1], [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1], [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1], [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1], [2014,315,2014,2014,1], [2014,316,20141112,20141112,1], [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1], [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1], [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1], [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1], [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1], [2014,327,20141123,20141123,1], [2014,328,20141... But the join queries throw this error:* java.lang.ArrayIndexOutOfBoundsException* *scala val q = sqlContext.sql(select * from date_d dd join interval_f intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND intf.DATE_WID = 20141110)* q: org.apache.spark.sql.SchemaRDD = SchemaRDD[38] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29] ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight Exchange (HashPartitioning [WID#0], 200) InMemoryColumnarTableScan [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA... *scala q.take(5).foreach(println)* 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at basicOperators.scala:136 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46
Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query
I have three tables with the following schema: case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp, DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR: Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String, MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int) case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int, MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int, VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int, SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME: java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp, INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME: java.sql.Timestamp) class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{ @throws(classOf[IndexOutOfBoundsException]) override def productElement(n: Int) = n match { case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 = ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID; case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 = PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 = IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 = ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 = TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS; case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 = LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString()) } override def productArity: Int = 29; override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d] } Non-join queries work fine: *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID), COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR, DAY_OF_YEAR)* res4: Array[org.apache.spark.sql.Row] = Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1], [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1], [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1], [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1], [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1], [2014,315,2014,2014,1], [2014,316,20141112,20141112,1], [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1], [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1], [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1], [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1], [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1], [2014,327,20141123,20141123,1], [2014,328,20141... But the join queries throw this error:* java.lang.ArrayIndexOutOfBoundsException* *scala val q = sqlContext.sql(select * from date_d dd join interval_f intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND intf.DATE_WID = 20141110)* q: org.apache.spark.sql.SchemaRDD = SchemaRDD[38] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29] ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight Exchange (HashPartitioning [WID#0], 200) InMemoryColumnarTableScan [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA... *scala q.take(5).foreach(println)* 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at basicOperators.scala:136 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at Exchange.scala:48) 15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at Exchange.scala:48) 15/02/27 15:50:26 INFO DAGScheduler: Got job 2
Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query
I have three tables with the following schema: case class *date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp, DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR: Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String, MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int) case class *interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int, MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int, VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int, SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME: java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp, INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME: java.sql.Timestamp) class * sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{ @throws(classOf[IndexOutOfBoundsException]) override def productElement(n: Int) = n match { case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 = ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID; case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 = PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 = IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 = ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 = TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS; case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 = LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString()) } override def productArity: Int = 29; override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d] } Non-join queries work fine: *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID), COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR, DAY_OF_YEAR)* res4: Array[org.apache.spark.sql.Row] = Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1], [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1], [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1], [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1], [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1], [2014,315,2014,2014,1], [2014,316,20141112,20141112,1], [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1], [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1], [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1], [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1], [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1], [2014,327,20141123,20141123,1], [2014,328,20141... *But the join queries throw this error: java.lang.ArrayIndexOutOfBoundsException* *scala val q = sqlContext.sql(select * from date_d dd join interval_f intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND intf.DATE_WID = 20141110)* q: org.apache.spark.sql.SchemaRDD = SchemaRDD[38] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29] ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight Exchange (HashPartitioning [WID#0], 200) InMemoryColumnarTableScan [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA... *scala q.take(5).foreach(println)* 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at basicOperators.scala:136 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at Exchange.scala:48) 15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at Exchange.scala:48) 15/02/27 15:50:26 INFO DAGScheduler: Got job 2