Your parenthesis don't look right as you're embedding the filter on the Row.fromSeq().
Try this: val trainRDD = rawTrainData .filter(!_.isEmpty) .map(rawRow => Row.fromSeq(rawRow.split(","))) .filter(_.length == 15) .map(_.toString).map(_.trim) -Don On Fri, May 15, 2015 at 11:17 PM, Mike Frampton <mike_framp...@hotmail.com> wrote: > Hi > > Im getting the following error when trying to process a csv based data > file. > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent > failure: Lost task 1.3 in stage 10.0 (TID 262, > hc2r1m3.semtech-solutions.co.nz):* > java.lang.ArrayIndexOutOfBoundsException: 0* > at > org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) > at > org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > > I have made sure that none of my data rows are empty and that they all > have 15 records. I have also physically checked the > data. The error occurs when I run the actual spark sql on the last line. > The script is as follows. > > val server = "hdfs://hc2nn.semtech-solutions.co.nz:8020" > val path = "/data/spark/h2o/" > > val train_csv = server + path + "adult.train.data" // 32,562 rows > val test_csv = server + path + "adult.test.data" // 16,283 rows > > // load the data > > val rawTrainData = sparkCxt.textFile(train_csv) > val rawTestData = sparkCxt.textFile(test_csv) > > // create a spark sql schema for the row > > val schemaString = "age workclass fnlwgt education educationalnum > maritalstatus" + > " occupation relationship race gender capitalgain > capitalloss" + > " hoursperweek nativecountry income" > > val schema = StructType( schemaString.split(" ") > .map(fieldName => StructField(fieldName, StringType, false))) > > // create an RDD from the raw training data > > val trainRDD = rawTrainData > .filter(!_.isEmpty) > .map(rawRow => Row.fromSeq(rawRow.split(",") > .filter(_.length == 15) > .map(_.toString).map(_.trim) )) > > println( ">>>>> Raw Training Data Count = " + trainRDD.count() ) > > val testRDD = rawTestData > .filter(!_.isEmpty) > .map(rawRow => Row.fromSeq(rawRow.split(",") > .filter(_.length == 15) > .map(_.toString).map(_.trim) )) > > println( ">>>>> Raw Testing Data Count = " + testRDD.count() ) > > // create a schema RDD > > val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema) > val testSchemaRDD = sqlContext.applySchema(testRDD, schema) > > // register schema RDD as a table > > trainSchemaRDD.registerTempTable("trainingTable") > testSchemaRDD.registerTempTable("testingTable") > > println( ">>>>> Schema RDD Training Data Count = " + > trainSchemaRDD.count() ) > println( ">>>>> Schema RDD Testing Data Count = " + > testSchemaRDD.count() ) > > // now run sql against the table to filter the data > > > > > > * val schemaRddTrain = sqlContext.sql( "SELECT "+ > "age,workclass,education,maritalstatus,occupation,relationship,race,"+ > "gender,hoursperweek,nativecountry,income "+ "FROM trainingTable LIMIT > 5000")* > > println( ">>>>> Training Data Count = " + schemaRddTrain.count() ) > > Any advice is appreciated :) > > -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ http://www.DrudgeSiren.com/ http://plu.gd/ 800-733-2143