You can use *from_json* built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-
On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk_sp...@163.com> wrote: > hi,all : > I'm using Structured Streaming to read kafka , the data type is json > String , I want to parse it and conver to a datafrme , my code can't pass > compile , I don't know how to fix it: > > > val lines = messages.selectExpr("CAST(value AS STRING) as value").as[ > String] > > val words = lines.map(line => { > var json: JValue = null > try { > json = parse(line) > } catch { > case ex: Exception => { println(ex.getMessage + " " + line) } > } > //var result: scala.collection.mutable.Map[String,String] = > scala.collection.mutable.Map() > val jsonObj = json.values.asInstanceOf[Map[String, _]] > val valuse = jsonObj.values.toArray > val schema = StructType(List()) > for ((k, v) <- jsonObj){ > //result += (k -> jsonObj.get(k).toString()) > > if(v.isInstanceOf[String]){ > schema.add(k,StringType) > }else if (v.isInstanceOf[Int]){ > schema.add(k,IntegerType) > }/*else if (v.isInstanceOf[Array[String]]){ > schema.add(k,ArrayType(StringType)) > }else if (v.isInstanceOf[Map[String,String]]){ > schema.add(k,MapType(StringType,StringType)) > }*/ > } > val row = new GenericRowWithSchema(valuse,schema) > row > }) > > > Error:(45, 26) Unable to find encoder for type > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit > Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is > needed to store > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in > a Dataset. Primitive types (Int, String, etc) and Product types (case > classes) are supported by importing spark.implicits._ Support for > serializing other types will be added in future releases. > val words = lines.map(line => { > > Error:(45, 26) not enough arguments for method map: (implicit evidence$6: > org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. > Unspecified value parameter evidence$6. > val words = lines.map(line => { > > > > 2019-09-17 > ------------------------------ > lk_spark >