I have try like this:
      
      val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
      val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
      val ab = ArrayBuffer[Any]()
      for (i <- 0 until schemaType.length) {
        if (schemaType(i).equalsIgnoreCase("int")) {
          ab += attributes(i).toInt
        } else if (schemaType(i).equalsIgnoreCase("long")) {
          ab += attributes(i).toLong
        } else {
          ab += attributes(i)
        }
      }
      Row(ab.toArray)
    })

        val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
     Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha <guha.a...@gmail.com>
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:





















hi,all



    I have a txt file ,and I want to process it as dataframe 

:







    data like this :



       name1,30



       name2,18







    val schemaString = "name age year"
    

val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()
    

xMap.put("name", StringType)
    xMap.put("age", 

IntegerType)
    xMap.put("year", 

IntegerType)
    
    val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
    val schema = 

StructType(fields)
    
    val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")
    

//spark.read.schema(schema).text("/sourcedata/test/test*")
    


    val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







    // Apply the schema to the RDD
    val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







    but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

Reply via email to