Your parenthesis don't look right as you're embedding the filter on the
Row.fromSeq().

Try this:

  val trainRDD  = rawTrainData
         .filter(!_.isEmpty)
         .map(rawRow => Row.fromSeq(rawRow.split(",")))
         .filter(_.length == 15)
         .map(_.toString).map(_.trim)


-Don

On Fri, May 15, 2015 at 11:17 PM, Mike Frampton <mike_framp...@hotmail.com>
wrote:

> Hi
>
> Im getting the following error when trying to process a csv based data
> file.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent
> failure: Lost task 1.3 in stage 10.0 (TID 262,
> hc2r1m3.semtech-solutions.co.nz):*
> java.lang.ArrayIndexOutOfBoundsException: 0*
>         at
> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>         at
> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>         at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
>         at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>
> I have made sure that none of my data rows are empty and that they all
> have 15 records. I have also physically checked the
> data. The error occurs when I run the actual spark sql on the last line.
> The script is as follows.
>
>   val server    = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
>   val path      = "/data/spark/h2o/"
>
>   val train_csv =  server + path + "adult.train.data" // 32,562 rows
>   val test_csv  =  server + path + "adult.test.data"  // 16,283 rows
>
>   // load the data
>
>   val rawTrainData = sparkCxt.textFile(train_csv)
>   val rawTestData  = sparkCxt.textFile(test_csv)
>
>   // create a spark sql schema for the row
>
>   val schemaString = "age workclass fnlwgt education educationalnum
> maritalstatus" +
>                      " occupation relationship race gender capitalgain
> capitalloss" +
>                      " hoursperweek nativecountry income"
>
>   val schema = StructType( schemaString.split(" ")
>       .map(fieldName => StructField(fieldName, StringType, false)))
>
>   // create an RDD from the raw training data
>
>   val trainRDD  = rawTrainData
>          .filter(!_.isEmpty)
>          .map(rawRow => Row.fromSeq(rawRow.split(",")
>          .filter(_.length == 15)
>          .map(_.toString).map(_.trim) ))
>
>   println( ">>>>> Raw Training Data Count = " + trainRDD.count() )
>
>   val testRDD   = rawTestData
>          .filter(!_.isEmpty)
>          .map(rawRow  => Row.fromSeq(rawRow.split(",")
>          .filter(_.length == 15)
>          .map(_.toString).map(_.trim) ))
>
>   println( ">>>>> Raw Testing Data Count = " + testRDD.count() )
>
>   // create a schema RDD
>
>   val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
>   val testSchemaRDD  = sqlContext.applySchema(testRDD,  schema)
>
>   // register schema RDD as a table
>
>   trainSchemaRDD.registerTempTable("trainingTable")
>   testSchemaRDD.registerTempTable("testingTable")
>
>   println( ">>>>> Schema RDD Training Data Count = " +
> trainSchemaRDD.count() )
>   println( ">>>>> Schema RDD Testing Data Count  = " +
> testSchemaRDD.count() )
>
>   // now run sql against the table to filter the data
>
>
>
>
>
> *  val schemaRddTrain = sqlContext.sql(    "SELECT "+
> "age,workclass,education,maritalstatus,occupation,relationship,race,"+
> "gender,hoursperweek,nativecountry,income "+    "FROM trainingTable LIMIT
> 5000")*
>
>   println( ">>>>> Training Data Count = " + schemaRddTrain.count() )
>
> Any advice is appreciated :)
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
http://www.DrudgeSiren.com/
http://plu.gd/
800-733-2143

Reply via email to