Hi
Im getting the following error when trying to process a csv based data file.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to
stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost
task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz):
java.lang.ArrayIndexOutOfBoundsException: 0
at
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
I have made sure that none of my data rows are empty and that they all have 15
records. I have also physically checked the
data. The error occurs when I run the actual spark sql on the last line. The
script is as follows.
val server = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
val path = "/data/spark/h2o/"
val train_csv = server + path + "adult.train.data" // 32,562 rows
val test_csv = server + path + "adult.test.data" // 16,283 rows
// load the data
val rawTrainData = sparkCxt.textFile(train_csv)
val rawTestData = sparkCxt.textFile(test_csv)
// create a spark sql schema for the row
val schemaString = "age workclass fnlwgt education educationalnum
maritalstatus" +
" occupation relationship race gender capitalgain
capitalloss" +
" hoursperweek nativecountry income"
val schema = StructType( schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, false)))
// create an RDD from the raw training data
val trainRDD = rawTrainData
.filter(!_.isEmpty)
.map(rawRow => Row.fromSeq(rawRow.split(",")
.filter(_.length == 15)
.map(_.toString).map(_.trim) ))
println( ">>>>> Raw Training Data Count = " + trainRDD.count() )
val testRDD = rawTestData
.filter(!_.isEmpty)
.map(rawRow => Row.fromSeq(rawRow.split(",")
.filter(_.length == 15)
.map(_.toString).map(_.trim) ))
println( ">>>>> Raw Testing Data Count = " + testRDD.count() )
// create a schema RDD
val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
val testSchemaRDD = sqlContext.applySchema(testRDD, schema)
// register schema RDD as a table
trainSchemaRDD.registerTempTable("trainingTable")
testSchemaRDD.registerTempTable("testingTable")
println( ">>>>> Schema RDD Training Data Count = " + trainSchemaRDD.count() )
println( ">>>>> Schema RDD Testing Data Count = " + testSchemaRDD.count() )
// now run sql against the table to filter the data
val schemaRddTrain = sqlContext.sql(
"SELECT "+
"age,workclass,education,maritalstatus,occupation,relationship,race,"+
"gender,hoursperweek,nativecountry,income "+
"FROM trainingTable LIMIT 5000")
println( ">>>>> Training Data Count = " + schemaRddTrain.count() )
Any advice is appreciated :)