Thanks Jon, great Learning resource. Thanks Pandees, addresses[0].city would work , but I want all the cities not just from addresses[0]. Finally, I wrote the following function to get the cities.
def getCities(addresses: Seq[Address]) : String = { var cities:String = "" if (addresses.size > 0) { cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",") // cities = addresses.foldLeft("")((str,addr) => str + addr.city.getOrElse("")) } cities } Great help. Thanks again On Sun, Nov 20, 2016 at 1:10 PM, Jon Gregg <jonrgr...@gmail.com> wrote: > In these cases it might help to just flatten the DataFrame. Here's a > helper function from the tutorial (scroll down to the "Flattening" header: > https://docs.cloud.databricks.com/docs/latest/databricks_ > guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/ > 02%20Introduction%20to%20DataFrames%20-%20scala.html > > > On Sun, Nov 20, 2016 at 1:24 PM, pandees waran <pande...@gmail.com> wrote: > >> have you tried using "." access method? >> >> e.g: >> ds1.select("name","addresses[0].element.city") >> >> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> The following my dataframe schema >>> >>> root >>> |-- name: string (nullable = true) >>> |-- addresses: array (nullable = true) >>> | |-- element: struct (containsNull = true) >>> | | |-- street: string (nullable = true) >>> | | |-- city: string (nullable = true) >>> >>> I want to output name and city. The following is my spark streaming app >>> which outputs name and addresses, but I want name and cities in the output. >>> >>> object PersonConsumer { >>> import org.apache.spark.sql.{SQLContext, SparkSession} >>> import com.example.protos.demo._ >>> >>> def main(args : Array[String]) { >>> >>> val spark = SparkSession.builder. >>> master("local") >>> .appName("spark session example") >>> .getOrCreate() >>> >>> import spark.implicits._ >>> >>> val ds1 = spark.readStream.format("kafka"). >>> option("kafka.bootstrap.servers","localhost:9092"). >>> option("subscribe","person").load() >>> >>> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value" >>> )).map(Person.parseFrom(_)).select($"name", $"addresses") >>> >>> ds2.printSchema() >>> >>> val query = ds2.writeStream >>> .outputMode("append") >>> .format("console") >>> .start() >>> >>> query.awaitTermination() >>> } >>> } >>> >>> Appreciate your help. Thanks. >>> >> >> >> >> -- >> Thanks, >> Pandeeswaran >> > >