Re: Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-28 Thread anamika gupta
The issue is now resolved.

One of the csv files had an incorrect record at the end.

On Fri, Feb 27, 2015 at 4:24 PM, anamika gupta anamika.guo...@gmail.com
wrote:

 I have three tables with the following schema:

 case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
 DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
 Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
 MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



 case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
 MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
 VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
 SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
 java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
 INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
 java.sql.Timestamp)



 class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
 :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
 :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String],
 GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
 :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
 IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
 :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
 ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
 :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
 BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
 LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
 INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
 :Option[java.sql.Timestamp]) extends Product{

 @throws(classOf[IndexOutOfBoundsException])
 override def productElement(n: Int) = n match
 {
 case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 =
 ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID;
 case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 =
 PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 =
 IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 =
 ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 =
 TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case
 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS;
 case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 =
 LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString())
 }

 override def productArity: Int = 29; override def canEqual(that: Any):
 Boolean = that.isInstanceOf[sdp_d]
 }



 Non-join queries work fine:

 *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
 COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
 DAY_OF_YEAR)*

 res4: Array[org.apache.spark.sql.Row] =
 Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
 [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
 [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
 [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
 [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
 [2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
 [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
 [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
 [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
 [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
 [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
 [2014,327,20141123,20141123,1], [2014,328,20141...



 But the join queries throw this error:*
 java.lang.ArrayIndexOutOfBoundsException*

 *scala val q = sqlContext.sql(select * from date_d dd join interval_f
 intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND
 intf.DATE_WID = 20141110)*

 q: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[38] at RDD at SchemaRDD.scala:103
 == Query Plan ==
 == Physical Plan ==
 Project
 [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
  ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
   Exchange (HashPartitioning [WID#0], 200)
InMemoryColumnarTableScan
 [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


 *scala q.take(5).foreach(println)*

 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
 basicOperators.scala:136
 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 

Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-27 Thread anamika gupta
I have three tables with the following schema:

case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
java.sql.Timestamp)



class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
:Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
:Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String],
GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
:Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
:Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
:Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
:Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 =
ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID;
case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 =
PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 =
IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 =
ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 =
TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case
23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS;
case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 =
LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29; override def canEqual(that: Any):
Boolean = that.isInstanceOf[sdp_d]
}



Non-join queries work fine:

*val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
DAY_OF_YEAR)*

res4: Array[org.apache.spark.sql.Row] =
Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
[2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
[2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
[2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
[2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
[2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
[2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
[2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
[2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
[2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
[2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
[2014,327,20141123,20141123,1], [2014,328,20141...



But the join queries throw this error:*
java.lang.ArrayIndexOutOfBoundsException*

*scala val q = sqlContext.sql(select * from date_d dd join interval_f
intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND
intf.DATE_WID = 20141110)*

q: org.apache.spark.sql.SchemaRDD =
SchemaRDD[38] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
 ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
  Exchange (HashPartitioning [WID#0], 200)
   InMemoryColumnarTableScan
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


*scala q.take(5).foreach(println)*

15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
basicOperators.scala:136
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO DAGScheduler: Got job 2 

Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-27 Thread anu
I have three tables with the following schema:

case class *date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



case class *interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
java.sql.Timestamp)



class * sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
:Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
:Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT
:Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
:Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
:Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
:Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
:Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 =
ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID;
case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 =
PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 =
IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 =
ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 =
TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case
23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS;
case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 =
LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29; override def canEqual(that: Any):
Boolean = that.isInstanceOf[sdp_d]
}



Non-join queries work fine:

*val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
DAY_OF_YEAR)*

res4: Array[org.apache.spark.sql.Row] =
Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
[2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
[2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
[2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
[2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
[2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
[2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
[2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
[2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
[2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
[2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
[2014,327,20141123,20141123,1], [2014,328,20141...



*But the join queries throw this error:
java.lang.ArrayIndexOutOfBoundsException*

*scala val q = sqlContext.sql(select * from date_d dd join interval_f
intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND
intf.DATE_WID = 20141110)*

q: org.apache.spark.sql.SchemaRDD =
SchemaRDD[38] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
 ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
  Exchange (HashPartitioning [WID#0], 200)
   InMemoryColumnarTableScan
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


*scala q.take(5).foreach(println)*

15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
basicOperators.scala:136
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO DAGScheduler: Got job 2