different schemas per row with DataFrames
I am reading JSON data that has different schemas for every record. That is, for a given field that would have a null value, it's simply absent from that record (and therefore, its schema). I would like to use the DataFrame API to select specific fields from this data, and for fields that are missing from a record, to default to null or an empty string. Is this possible or can DataFrames only handle a single consistent schema throughout the data? One thing I noticed is that the schema of the DataFrame is the superset of all the records in it, so if record A has field X, but record B does not, it will show up in B as null because it's part of the DataFrame's schema (because A has it). But if none of the records have field X, then referencing that field will result in an error about not being able to resolve that column. If I know the schema of all possible fields and the order in which they occur, it may be possible to get the RDD from the DataFrame and build my own DataFrame with createDataFrame and passing it my fabricated super-schema. However, this is brittle, as the super-schema is not in my control and may change in the future. Thanks for any suggestions, Alex.
Calling rdd() on a DataFrame causes stage boundary
When I call rdd() on a DataFrame, it ends the current stage and starts a new one that just maps the DataFrame to rdd and nothing else. It doesn't seem to do a shuffle (which is good and expected), but then why does why is there a separate stage? I also thought that stages only end when there's a shuffle or the job ends with the action that triggered the job. Thanks.
spark sql aggregate function "Nth"
Spark SQL has a "first" function that returns the first item in a group. Is there a similar function, perhaps in a third party lib, that allows you to return an arbitrary (e.g. 3rd) item from the group? Was thinking of writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is to be able to select a random item from the group, using random number generator. Thanks.
Re: spark sql aggregate function "Nth"
Ah, that gives me an idea. val window = Window.partitionBy() val getRand = udf((cnt:Int) => ) df .withColumn("cnt", count().over(window)) .withColumn("rnd", getRand($"cnt")) .where($"rnd" === $"cnt") Not sure how performant this would be, but writing a UDF is much simpler than a UDAF. On Tue, Jul 26, 2016 at 11:48 AM, ayan guha wrote: > You can use rank with window function. Rank=1 is same as calling first(). > > Not sure how you would randomly pick records though, if there is no Nth > record. In your example, what happens if data is of only 2 rows? > On 27 Jul 2016 00:57, "Alex Nastetsky" > wrote: > >> Spark SQL has a "first" function that returns the first item in a group. >> Is there a similar function, perhaps in a third party lib, that allows you >> to return an arbitrary (e.g. 3rd) item from the group? Was thinking of >> writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is >> to be able to select a random item from the group, using random number >> generator. >> >> Thanks. >> >
Re: Enabling mapreduce.input.fileinputformat.list-status.num-threads in Spark?
Ran into this need myself. Does Spark have an equivalent of "mapreduce. input.fileinputformat.list-status.num-threads"? Thanks. On Thu, Jul 23, 2015 at 8:50 PM, Cheolsoo Park wrote: > Hi, > > I am wondering if anyone has successfully enabled > "mapreduce.input.fileinputformat.list-status.num-threads" in Spark jobs. I > usually set this property to 25 to speed up file listing in MR jobs (Hive > and Pig). But for some reason, this property does not take effect in Spark > HadoopRDD resulting in serious delay in file listing. > > I verified that the property is indeed set in HadoopRDD by logging the > value of the property in the getPartitions() function. I also tried to > attach VisualVM to Spark and Pig clients, which look as follows- > > In Pig, I can see 25 threads running in parallel for file listing- > [image: Inline image 1] > > In Spark, I only see 2 threads running in parallel for file listing- > [image: Inline image 2] > > What's strange is that the # of concurrent threads in Spark is throttled > no matter how high I > set "mapreduce.input.fileinputformat.list-status.num-threads". > > Is anyone using Spark with this property enabled? If so, can you please > share how you do it? > > Thanks! > Cheolsoo >
Re: Enabling mapreduce.input.fileinputformat.list-status.num-threads in Spark?
Thanks. I was actually able to get mapreduce.input. fileinputformat.list-status.num-threads working in Spark against a regular fileset in S3, in Spark 1.5.2 ... looks like the issue is isolated to Hive. On Tue, Jan 12, 2016 at 6:48 PM, Cheolsoo Park wrote: > Alex, see this jira- > https://issues.apache.org/jira/browse/SPARK-9926 > > On Tue, Jan 12, 2016 at 10:55 AM, Alex Nastetsky < > alex.nastet...@vervemobile.com> wrote: > >> Ran into this need myself. Does Spark have an equivalent of "mapreduce. >> input.fileinputformat.list-status.num-threads"? >> >> Thanks. >> >> On Thu, Jul 23, 2015 at 8:50 PM, Cheolsoo Park >> wrote: >> >>> Hi, >>> >>> I am wondering if anyone has successfully enabled >>> "mapreduce.input.fileinputformat.list-status.num-threads" in Spark jobs. I >>> usually set this property to 25 to speed up file listing in MR jobs (Hive >>> and Pig). But for some reason, this property does not take effect in Spark >>> HadoopRDD resulting in serious delay in file listing. >>> >>> I verified that the property is indeed set in HadoopRDD by logging the >>> value of the property in the getPartitions() function. I also tried to >>> attach VisualVM to Spark and Pig clients, which look as follows- >>> >>> In Pig, I can see 25 threads running in parallel for file listing- >>> [image: Inline image 1] >>> >>> In Spark, I only see 2 threads running in parallel for file listing- >>> [image: Inline image 2] >>> >>> What's strange is that the # of concurrent threads in Spark is throttled >>> no matter how high I >>> set "mapreduce.input.fileinputformat.list-status.num-threads". >>> >>> Is anyone using Spark with this property enabled? If so, can you please >>> share how you do it? >>> >>> Thanks! >>> Cheolsoo >>> >> >> >
Databricks Cloud vs AWS EMR
As a user of AWS EMR (running Spark and MapReduce), I am interested in potential benefits that I may gain from Databricks Cloud. I was wondering if anyone has used both and done comparison / contrast between the two services. In general, which resource manager(s) does Databricks Cloud use for Spark? If it's YARN, can you also run MapReduce jobs in Databricks Cloud? Thanks.
spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)
I save my dataframe to avro with spark-avro 1.0.0 and it looks like this (using avro-tools tojson): {"field1":"value1","field2":976200} {"field1":"value2","field2":976200} {"field1":"value3","field2":614100} But when I use spark-avro 2.0.1, it looks like this: {"field1":{"string":"value1"},"field2":{"long":976200}} {"field1":{"string":"value2"},"field2":{"long":976200}} {"field1":{"string":"value3"},"field2":{"long":614100}} At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't seem to support specifying a compression codec (I want deflate).
Re: spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)
Here you go: https://github.com/databricks/spark-avro/issues/92 Thanks. On Wed, Oct 14, 2015 at 4:41 PM, Josh Rosen wrote: > Can you report this as an issue at > https://github.com/databricks/spark-avro/issues so that it's easier to > track? Thanks! > > On Wed, Oct 14, 2015 at 1:38 PM, Alex Nastetsky < > alex.nastet...@vervemobile.com> wrote: > >> I save my dataframe to avro with spark-avro 1.0.0 and it looks like this >> (using avro-tools tojson): >> >> {"field1":"value1","field2":976200} >> {"field1":"value2","field2":976200} >> {"field1":"value3","field2":614100} >> >> But when I use spark-avro 2.0.1, it looks like this: >> >> {"field1":{"string":"value1"},"field2":{"long":976200}} >> {"field1":{"string":"value2"},"field2":{"long":976200}} >> {"field1":{"string":"value3"},"field2":{"long":614100}} >> >> At this point I'd be happy to use spark-avro 1.0.0, except that it >> doesn't seem to support specifying a compression codec (I want deflate). >> > >
dataframes and numPartitions
A lot of RDD methods take a numPartitions parameter that lets you specify the number of partitions in the result. For example, groupByKey. The DataFrame counterparts don't have a numPartitions parameter, e.g. groupBy only takes a bunch of Columns as params. I understand that the DataFrame API is supposed to be smarter and go through a LogicalPlan, and perhaps determine the number of optimal partitions for you, but sometimes you want to specify the number of partitions yourself. One such use case is when you are preparing to do a "merge" join with another dataset that is similarly partitioned with the same number of partitions.
writing avro parquet
Using Spark 1.5.1, Parquet 1.7.0. I'm trying to write Avro/Parquet files. I have this code: sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, classOf[AvroWriteSupport].getName) AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$) myDF.write.parquet(outputPath) The problem is that the write support class gets overwritten in org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite: val writeSupportClass = if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { classOf[MutableRowWriteSupport] } else { classOf[RowWriteSupport] } ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) So it doesn't seem to actually write Avro data. When look at the metadata of the Parquet files it writes, it looks like this: extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]} I would expect to see something like "extra: avro.schema" instead.
Re: writing avro parquet
Figured it out ... needed to use saveAsNewAPIHadoopFile, but was trying to use it on myDF.rdd instead of converting it to a PairRDD first. On Mon, Oct 19, 2015 at 2:14 PM, Alex Nastetsky < alex.nastet...@vervemobile.com> wrote: > Using Spark 1.5.1, Parquet 1.7.0. > > I'm trying to write Avro/Parquet files. I have this code: > > sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, > classOf[AvroWriteSupport].getName) > AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$) > myDF.write.parquet(outputPath) > > The problem is that the write support class gets overwritten in > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite: > > val writeSupportClass = > if > (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { > classOf[MutableRowWriteSupport] > } else { > classOf[RowWriteSupport] > } > ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) > > So it doesn't seem to actually write Avro data. When look at the metadata > of the Parquet files it writes, it looks like this: > > extra: org.apache.spark.sql.parquet.row.metadata = > {"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]} > > I would expect to see something like "extra: avro.schema" instead. >
foreachPartition
I'm just trying to do some operation inside foreachPartition, but I can't even get a simple println to work. Nothing gets printed. scala> val a = sc.parallelize(List(1,2,3)) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21 scala> a.foreachPartition(p => println("foo")) 2015-10-30 23:38:54,643 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Starting job: foreachPartition at :24 2015-10-30 23:38:54,644 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 9 (foreachPartition at :24) with 3 output partitions (allowLocal=false) 2015-10-30 23:38:54,644 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: ResultStage 9(foreachPartition at :24) 2015-10-30 23:38:54,645 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final stage: List() 2015-10-30 23:38:54,645 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List() 2015-10-30 23:38:54,646 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting ResultStage 9 (ParallelCollectionRDD[2] at parallelize at :21), which has no missing parents 2015-10-30 23:38:54,648 INFO [dag-scheduler-event-loop] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(1224) called with curMem=14486, maxMem=280496701 2015-10-30 23:38:54,649 INFO [dag-scheduler-event-loop] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9 stored as values in memory (estimated size 1224.0 B, free 267.5 MB) 2015-10-30 23:38:54,680 INFO [dag-scheduler-event-loop] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(871) called with curMem=15710, maxMem=280496701 2015-10-30 23:38:54,681 INFO [dag-scheduler-event-loop] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9_piece0 stored as bytes in memory (estimated size 871.0 B, free 267.5 MB) 2015-10-30 23:38:54,685 INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_9_piece0 in memory on 10.170.11.94:35814 (size: 871.0 B, free: 267.5 MB) 2015-10-30 23:38:54,688 INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on 10.170.11.94:35814 in memory (size: 2.0 KB, free: 267.5 MB) 2015-10-30 23:38:54,691 INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on ip-10-51-144-180.ec2.internal:35111 in memory (size: 2.0 KB, free: 535.0 MB) 2015-10-30 23:38:54,691 INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on ip-10-51-144-180.ec2.internal:49833 in memory (size: 2.0 KB, free: 535.0 MB) 2015-10-30 23:38:54,694 INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on ip-10-51-144-180.ec2.internal:34776 in memory (size: 2.0 KB, free: 535.0 MB) 2015-10-30 23:38:54,702 INFO [dag-scheduler-event-loop] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 9 from broadcast at DAGScheduler.scala:874 2015-10-30 23:38:54,703 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 3 missing tasks from ResultStage 9 (ParallelCollectionRDD[2] at parallelize at :21) 2015-10-30 23:38:54,703 INFO [dag-scheduler-event-loop] cluster.YarnScheduler (Logging.scala:logInfo(59)) - Adding task set 9.0 with 3 tasks 2015-10-30 23:38:54,708 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Starting task 0.0 in stage 9.0 (TID 27, ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes) 2015-10-30 23:38:54,711 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Starting task 1.0 in stage 9.0 (TID 28, ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes) 2015-10-30 23:38:54,713 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_5_piece0 on 10.170.11.94:35814 in memory (size: 802.0 B, free: 267.5 MB) 2015-10-30 23:38:54,714 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Starting task 2.0 in stage 9.0 (TID 29, ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes) 2015-10-30 23:38:54,716 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_5_piece0 on ip-10-51-144-180.ec2.internal:34776 in memory (size: 802.0 B, free: 535.0 MB) 2015-10-30 23:38:54,719 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed broadcast_5_piece0 on ip-10-51-
Re: foreachPartition
Ahh, makes sense. Knew it was going to be something simple. Thanks. On Fri, Oct 30, 2015 at 7:45 PM, Mark Hamstra wrote: > The closure is sent to and executed an Executor, so you need to be looking > at the stdout of the Executors, not on the Driver. > > On Fri, Oct 30, 2015 at 4:42 PM, Alex Nastetsky < > alex.nastet...@vervemobile.com> wrote: > >> I'm just trying to do some operation inside foreachPartition, but I can't >> even get a simple println to work. Nothing gets printed. >> >> scala> val a = sc.parallelize(List(1,2,3)) >> a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at >> parallelize at :21 >> >> scala> a.foreachPartition(p => println("foo")) >> 2015-10-30 23:38:54,643 INFO [main] spark.SparkContext >> (Logging.scala:logInfo(59)) - Starting job: foreachPartition at :24 >> 2015-10-30 23:38:54,644 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 9 >> (foreachPartition at :24) with 3 output partitions >> (allowLocal=false) >> 2015-10-30 23:38:54,644 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: >> ResultStage 9(foreachPartition at :24) >> 2015-10-30 23:38:54,645 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final >> stage: List() >> 2015-10-30 23:38:54,645 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List() >> 2015-10-30 23:38:54,646 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting ResultStage >> 9 (ParallelCollectionRDD[2] at parallelize at :21), which has no >> missing parents >> 2015-10-30 23:38:54,648 INFO [dag-scheduler-event-loop] >> storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(1224) >> called with curMem=14486, maxMem=280496701 >> 2015-10-30 23:38:54,649 INFO [dag-scheduler-event-loop] >> storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9 stored >> as values in memory (estimated size 1224.0 B, free 267.5 MB) >> 2015-10-30 23:38:54,680 INFO [dag-scheduler-event-loop] >> storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(871) >> called with curMem=15710, maxMem=280496701 >> 2015-10-30 23:38:54,681 INFO [dag-scheduler-event-loop] >> storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9_piece0 >> stored as bytes in memory (estimated size 871.0 B, free 267.5 MB) >> 2015-10-30 23:38:54,685 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Added broadcast_9_piece0 in memory on >> 10.170.11.94:35814 (size: 871.0 B, free: 267.5 MB) >> 2015-10-30 23:38:54,688 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on >> 10.170.11.94:35814 in memory (size: 2.0 KB, free: 267.5 MB) >> 2015-10-30 23:38:54,691 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on >> ip-10-51-144-180.ec2.internal:35111 in memory (size: 2.0 KB, free: 535.0 MB) >> 2015-10-30 23:38:54,691 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on >> ip-10-51-144-180.ec2.internal:49833 in memory (size: 2.0 KB, free: 535.0 MB) >> 2015-10-30 23:38:54,694 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo >> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on >> ip-10-51-144-180.ec2.internal:34776 in memory (size: 2.0 KB, free: 535.0 MB) >> 2015-10-30 23:38:54,702 INFO [dag-scheduler-event-loop] >> spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 9 from >> broadcast at DAGScheduler.scala:874 >> 2015-10-30 23:38:54,703 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 3 missing >> tasks from ResultStage 9 (ParallelCollectionRDD[2] at parallelize at >> :21) >> 2015-10-30 23:38:54,703 INFO [dag-scheduler-event-loop] >> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Adding task set 9.0 >> with 3 tasks >> 2015-10-30 23:38:54,708 INFO >> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager >> (Logging.scala:logInfo(59)) - Starting task 0.0 in stage 9.0 (TID 27, >> ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes) >> 2015-10-30 23:38:54,711 INFO &
CompositeInputFormat in Spark
Does Spark have an implementation similar to CompositeInputFormat in MapReduce? CompositeInputFormat joins multiple datasets prior to the mapper, that are partitioned the same way with the same number of partitions, using the "part" number in the file name in each dataset to figure out which file to join with its counterparts in the other datasets. Here is a similar question from earlier this year: http://mail-archives.us.apache.org/mod_mbox/spark-user/201505.mbox/%3CCADrn=epwl6ghs9hfyo3csuxhshtycsrlbujcmpxrtz4zype...@mail.gmail.com%3E >From what I can tell, there's no way to tell Spark about how a dataset had been previously partitioned, other than repartitioning it in order to achieve a map-side join with a similarly partitioned dataset.
Sort Merge Join
Hi, I'm trying to understand SortMergeJoin (SPARK-2213). 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For example, in the code below, the two datasets have different number of partitions, but it still does a SortMerge join after a "hashpartitioning". CODE: val sparkConf = new SparkConf() .setAppName("SortMergeJoinTest") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.eventLog.enabled", "true") .set("spark.sql.planner.sortMergeJoin","true") sparkConf.setMaster("local-cluster[3,1,1024]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val inputpath = input.gz.parquet val df1 = sqlContext.read.parquet(inputpath).repartition(3) val df2 = sqlContext.read.parquet(inputpath).repartition(5) val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === $"foo2") result.explain() OUTPUT: == Physical Plan == SortMergeJoin [foo#0], [foo2#8] TungstenSort [foo#0 ASC], false, 0 TungstenExchange hashpartitioning(foo#0) ConvertToUnsafe Repartition 3, true Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] TungstenSort [foo2#8 ASC], false, 0 TungstenExchange hashpartitioning(foo2#8) TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7] Repartition 5, true Scan ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7] 2) If both datasets have already been previously partitioned/sorted the same and stored on the file system (e.g. in a previous job), is there a way to tell Spark this so that it won't want to do a "hashpartitioning" on them? It looks like Spark just considers datasets that have been just read from the the file system to have UnknownPartitioning. In the example below, I try to join a dataframe to itself, and it still wants to hash repartition. CODE: ... val df1 = sqlContext.read.parquet(inputpath) val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" === $"foo2") result.explain() OUTPUT: == Physical Plan == SortMergeJoin [foo#0], [foo2#4] TungstenSort [foo#0 ASC], false, 0 TungstenExchange hashpartitioning(foo#0) ConvertToUnsafe Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] TungstenSort [foo2#4 ASC], false, 0 TungstenExchange hashpartitioning(foo2#4) ConvertToUnsafe Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8] Scan ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8] Thanks.
Re: Sort Merge Join
Thanks for the response. Taking the file system based data source as “UnknownPartitioning”, will be a simple and SAFE way for JOIN, as it’s hard to guarantee the records from different data sets with the identical join keys will be loaded by the same node/task , since lots of factors need to be considered, like task pool size, cluster size, source format, storage, data locality etc.,. I’ll agree it’s worth to optimize it for performance concerns, and actually in Hive, it is called bucket join. I am not sure will that happens soon in Spark SQL. Yes, this is supported in - Hive with bucket join - Pig with USING "merge" <https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins> - MR with CompositeInputFormat But I guess it's not supported in Spark? On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao wrote: > 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For > example, in the code below, the two datasets have different number of > partitions, but it still does a SortMerge join after a "hashpartitioning". > > > > [Hao:] A distributed JOIN operation (either HashBased or SortBased Join) > requires the records with the identical join keys MUST BE shuffled to the > same “reducer” node / task, hashpartitioning is just a strategy to tell > spark shuffle service how to achieve that, in theory, we even can use the > `RangePartitioning` instead (but it’s less efficient, that’s why we don’t > choose it for JOIN). So conceptually the JOIN operator doesn’t care so much > about the shuffle strategy so much if it satisfies the demand on data > distribution. > > > > 2) If both datasets have already been previously partitioned/sorted the > same and stored on the file system (e.g. in a previous job), is there a way > to tell Spark this so that it won't want to do a "hashpartitioning" on > them? It looks like Spark just considers datasets that have been just read > from the the file system to have UnknownPartitioning. In the example below, > I try to join a dataframe to itself, and it still wants to hash repartition. > > > > [Hao:] Take this as example: > > > > EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON > a.key=b.key JOIN src c ON b.key=c.key > > > > == Physical Plan == > > TungstenProject [value#20,value#22,value#24] > > SortMergeJoin [key#21], [key#23] > > TungstenSort [key#21 ASC], false, 0 > >TungstenProject [key#21,value#22,value#20] > > SortMergeJoin [key#19], [key#21] > > TungstenSort [key#19 ASC], false, 0 > > TungstenExchange hashpartitioning(key#19,200) > >ConvertToUnsafe > > HiveTableScan [key#19,value#20], (MetastoreRelation default, src, > Some(a)) > > TungstenSort [key#21 ASC], false, 0 > > TungstenExchange hashpartitioning(key#21,200) > >ConvertToUnsafe > > HiveTableScan [key#21,value#22], (MetastoreRelation default, src, > Some(b)) > > TungstenSort [key#23 ASC], false, 0 > >TungstenExchange hashpartitioning(key#23,200) > > ConvertToUnsafe > > HiveTableScan [key#23,value#24], (MetastoreRelation default, src, > Some(c)) > > > > There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN > src b ON a.key=b.key”, as we didn’t change the data distribution after > it, so we can join another table “JOIN src c ON b.key=c.key” directly, > which only require the table “c” for repartitioning on “key”. > > > > Taking the file system based data source as “UnknownPartitioning”, will > be a simple and SAFE way for JOIN, as it’s hard to guarantee the records > from different data sets with the identical join keys will be loaded by the > same node/task , since lots of factors need to be considered, like task > pool size, cluster size, source format, storage, data locality etc.,. > > I’ll agree it’s worth to optimize it for performance concerns, and > actually in Hive, it is called bucket join. I am not sure will that happens > soon in Spark SQL. > > > > Hao > > > > *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com] > *Sent:* Monday, November 2, 2015 11:29 AM > *To:* user > *Subject:* Sort Merge Join > > > > Hi, > > > > I'm trying to understand SortMergeJoin (SPARK-2213). > > > > 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For > example, in the code below, the two datasets have different number of > partitions, but it still does a SortMerge join after a "hashpartitioning". > > > > CODE: > >val sparkConf = new SparkConf() > > .setAppName("SortMergeJoinTest") > > .set("spark.
Re: Spark 1.5 UDAF ArrayType
Hi, I believe I ran into the same bug in 1.5.0, although my error looks like this: Caused by: java.lang.ClassCastException: [Lcom.verve.spark.sql.ElementWithCount; cannot be cast to org.apache.spark.sql.types.ArrayData at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373) ... I confirmed that it's fixed in 1.5.1, but unfortunately I'm using AWS EMR 4.1.0 (the latest), which has Spark 1.5.0. Are there any workarounds in 1.5.0? Thanks. > Michael Thank you for your prompt answer. I will repost after I try this again on > 1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would > be very helpful. I am familiar with the Hive data types, but there is very > little documentation on Spark SQL data types. Regards > Deenar On 22 September 2015 at 19:28, Michael Armbrust < > mich...@databricks.com> > wrote: > I think that you are hitting a bug (which should be fixed in Spark > > 1.5.1). I'm hoping we can cut an RC for that this week. Until then you > > could try building branch-1.5. > > > > On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar < > > deenar.toras...@gmail.com> wrote: > > > >> Hi > >> > >> I am trying to write an UDAF ArraySum, that does element wise sum of > >> arrays of Doubles returning an array of Double following the sample in > >> > >> > https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html > . > >> I am getting the following error. Any guidance on handle complex type in > >> Spark SQL would be appreciated. > >> > >> Regards > >> Deenar > >> > >> import org.apache.spark.sql.expressions.MutableAggregationBuffer > >> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction > >> import org.apache.spark.sql.Row > >> import org.apache.spark.sql.types._ > >> import org.apache.spark.sql.functions._ > >> > >> class ArraySum extends UserDefinedAggregateFunction { > >> def inputSchema: org.apache.spark.sql.types.StructType = > >> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil) > >> > >> def bufferSchema: StructType = > >> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil) > >> > >> def dataType: DataType = ArrayType(DoubleType, false) > >> > >> def deterministic: Boolean = true > >> > >> def initialize(buffer: MutableAggregationBuffer): Unit = { > >> buffer(0) = Nil > >> } > >> > >> def update(buffer: MutableAggregationBuffer,input: Row): Unit = { > >> val currentSum : Seq[Double] = buffer.getSeq(0) > >> val currentRow : Seq[Double] = input.getSeq(0) > >> buffer(0) = (currentSum, currentRow) match { > >> case (Nil, Nil) => Nil > >> case (Nil, row) => row > >> case (sum, Nil) => sum > >> case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + > >> b } > >> // TODO handle different sizes arrays here > >> } > >> } > >> > >> def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { > >> val currentSum : Seq[Double] = buffer1.getSeq(0) > >> val currentRow : Seq[Double] = buffer2.getSeq(0) > >> buffer1(0) = (currentSum, currentRow) match { > >> case (Nil, Nil) => Nil > >> case (Nil, row) => row > >> case (sum, Nil) => sum > >> case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + > >> b } > >> // TODO handle different sizes arrays here > >> } > >> } > >> > >> def evaluate(buffer: Row): Any = { > >> buffer.getSeq(0) > >> } > >> } > >> > >> val arraySum = new ArraySum > >> sqlContext.udf.register("ArraySum", arraySum) > >> > >> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date = > >> '2015-05-22' limit 10* > >> > >> gives me the following error > >> > >> > >> Error in SQL statement: SparkException: Job aborted due to stage > failure: > >> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task > 0.3 in > >> stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException: > >> scala.collection.mutable.WrappedArray$ofRef cannot be cast to > >> org.apache.spark.sql.types.ArrayData at > >> > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47) > >> at > >> > org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247) > >> at > >> > org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108) > >> at > >> > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown > >> Source) at > >> > org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373) > >> at > >> > org.apache.spark.sql.execution.aggre
Spark SQL UDAF works fine locally, OutOfMemory on YARN
Hi, I am using Spark 1.5.1. I have a Spark SQL UDAF that works fine on a tiny dataset (13 narrow rows) in local mode, but runs out of memory on YARN about half the time (OutOfMemory: Java Heap Space). The rest of the time, it works on YARN. Note that in all instances, the input data is the same. Here is the UDAF: https://gist.github.com/alexnastetsky/581af2672328c4b8b023 I am also using a trivial UDT to keep track of each unique value and its count. The basic idea is to have a secondary grouping and to count the number occurrences of each value in the group. For example, we want to group on column X; then for each group, we want to aggregate the rows by column Y and count how many times each unique value of Y appears. So, given the following data: X Y a 1 a 2 a 2 a 2 b 3 b 3 b 4 I would do myudaf = new MergeArraysOfElementWithCountUDAF() df = // load data df.groupBy($"X") .agg( myudaf($"Y").as("aggY") ) should provide data like the following X aggY a [{"element":"1", "count":"1"}, {"element":"2", "count":"3"}] b [{"element":"3", "count":"2"}, {"element":"4", "count":"1"}] There's also an option to take as input an array, instead of a scalar, in which case it just loops through the array and performs the same operation. I've added some logging to show the Runtime.getRuntime.freeMemory right before it throws the OOM error, and it shows plenty of memory (16 GB, when I was running on a large node) still available. So I'm not sure if it's some huge memory spike, or it's not actually seeing that available memory. When the OOM does happen, it consistently happens at this line: https://gist.github.com/alexnastetsky/581af2672328c4b8b023#file-mergearraysofelementwithcountudaf-scala-L59 java.lang.OutOfMemoryError: Java heap space at scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:165) at scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:164) at org.apache.spark.sql.types.ArrayData.toArray(ArrayData.scala:108) at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:235) at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:193) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:414) at org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.get(udaf.scala:298) at org.apache.spark.sql.Row$class.getAs(Row.scala:316) at org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.getAs(udaf.scala:269) at com.verve.scorecard.spark.sql.MergeArraysOfElementWithCountUDAF.merge(MergeArraysOfElementWithCountUDAF.scala:59) Again, the data is tiny and it doesn't explain why it only happens some of the time on YARN, and never when running in local mode. Here's how I am running the app: spark-submit \ --deploy-mode cluster \ --master yarn \ --num-executors 1 \ --executor-cores 1 \ --executor-memory 18g \ --driver-java-options "-XX:MaxPermSize=256m" \ --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=256m" \ --conf spark.storage.memoryFraction=0.2 \ --conf spark.shuffle.memoryFraction=0.6 \ --conf spark.sql.shuffle.partitions=1000 \ [...app specific stuff...] Note that I am using a single executor and single core to help with debugging, but I have the same issue with more executors/nodes. I am running this on EMR on AWS, so this is unlikely to be a hardware issue (different hardware each time I launch a cluster). I've also isolated the issue to this UDAF, as removing it from my Spark SQL makes the issue go away. Any ideas would be appreciated. Thanks, Alex.
restart from last successful stage
Is it possible to restart the job from the last successful stage instead of from the beginning? For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a long time and is successful, but the job fails on stage 1, it would be useful to be able to restart from the output of stage 0 instead of from the beginning. Note that I am NOT talking about Spark Streaming, just Spark Core (and DataFrames), not sure if the case would be different with Streaming. Thanks.
Re: restart from last successful stage
I meant a restart by the user, as ayan said. I was thinking of a case where e.g. a Spark conf setting wrong and the job failed in Stage 1, in my example .. and we want to rerun the job with the right conf without rerunning Stage 0. Having this "re-start" capability may cause some chaos if it would have changed how Stage 0 runs, possibly creating partition incompatibilities or something else. Also, another option is to just persist the data from Stage 0 (i.e. sc.saveAs) and then run a modified version of the job that skips Stage 0, assuming you have a full understanding of the breakdown of stages in your job. On Tue, Jul 28, 2015 at 9:28 PM, Tathagata Das wrote: > Okay, may I am confused on the word "would be useful to *restart* from the > output of stage 0" ... did the OP mean restart by the user or restart > automatically by the system? > > On Tue, Jul 28, 2015 at 3:43 PM, ayan guha wrote: > >> Hi >> >> I do not think op asks about attempt failure but stage failure and >> finally leading to job failure. In that case, rdd info from last run is >> gone even if from cache, isn't it? >> >> Ayan >> On 29 Jul 2015 07:01, "Tathagata Das" wrote: >> >>> If you are using the same RDDs in the both the attempts to run the job, >>> the previous stage outputs generated in the previous job will indeed be >>> reused. >>> This applies to core though. For dataframes, depending on what you do, >>> the physical plan may get generated again leading to new RDDs which may >>> cause recomputing all the stages. Consider running the job by generating >>> the RDD from Dataframe and then using that. >>> >>> Of course, you can use caching in both core and DataFrames, which will >>> solve all these concerns. >>> >>> On Tue, Jul 28, 2015 at 1:03 PM, Alex Nastetsky < >>> alex.nastet...@vervemobile.com> wrote: >>> >>>> Is it possible to restart the job from the last successful stage >>>> instead of from the beginning? >>>> >>>> For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a >>>> long time and is successful, but the job fails on stage 1, it would be >>>> useful to be able to restart from the output of stage 0 instead of from the >>>> beginning. >>>> >>>> Note that I am NOT talking about Spark Streaming, just Spark Core (and >>>> DataFrames), not sure if the case would be different with Streaming. >>>> >>>> Thanks. >>>> >>> >>> >
dataframe json schema scan
The doc for DataFrameReader#json(RDD[String]) method says "Unless the schema is specified using schema function, this function goes through the input once to determine the input schema." https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader Why is this necessary? Why can't it create the dataframe at the same time as it's determining the schema? Thanks.
Dataset API inconsistencies
I am finding using the Dataset API to be very cumbersome to use, which is unfortunate, as I was looking forward to the type-safety after coming from a Dataframe codebase. This link summarizes my troubles: http://loicdescotte. github.io/posts/spark2-datasets-type-safety/ The problem is having to continuously switch back and forth between typed and untyped semantics, which really kills productivity. In contrast, the RDD API is consistently typed and the Dataframe API is consistently untyped. I don't have to continuously stop and think about which one to use for each operation. I gave the Frameless framework (mentioned in the link) a shot, but eventually started running into oddities and lack of enough documentation and community support and did not want to sink too much time into it. At this point I'm considering just sticking with Dataframes, as I don't really consider Datasets to be usable. Has anyone had a similar experience or has had better luck? Alex.
partitionBy with partitioned column in output?
Is there a way to make outputs created with "partitionBy" to contain the partitioned column? When reading the output with Spark or Hive or similar, it's less of an issue because those tools know how to perform partition discovery. But if I were to load the output into an external data warehouse or database, it would have no idea. Example below -- a dataframe with two columns "foo" and "bar" is partitioned by "foo", but the data only contains "bar", since it expects the reader to know how to derive the value of "foo" from the parent directory. Note that it's the same thing with Parquet and Avro as well, I just chose to use JSON in my example. scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.partitionBy("foo").json("json-out") $ ls json-out/ foo=1 foo=2 _SUCCESS $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json {"bar":10} $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json {"bar":20} Thanks, Alex.
Re: partitionBy with partitioned column in output?
Yeah, was just discussing this with a co-worker and came to the same conclusion -- need to essentially create a copy of the partition column. Thanks. Hacky, but it works. Seems counter-intuitive that Spark would remove the column from the output... should at least give you an option to keep it. On Mon, Feb 26, 2018 at 5:47 PM, naresh Goud wrote: > is this helps? > > sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").map((" > foo","bar")=>("foo",("foo","bar"))).partitionBy("foo").json("json-out") > > > On Mon, Feb 26, 2018 at 4:28 PM, Alex Nastetsky > wrote: > >> Is there a way to make outputs created with "partitionBy" to contain the >> partitioned column? When reading the output with Spark or Hive or similar, >> it's less of an issue because those tools know how to perform partition >> discovery. But if I were to load the output into an external data warehouse >> or database, it would have no idea. >> >> Example below -- a dataframe with two columns "foo" and "bar" is >> partitioned by "foo", but the data only contains "bar", since it expects >> the reader to know how to derive the value of "foo" from the parent >> directory. Note that it's the same thing with Parquet and Avro as well, I >> just chose to use JSON in my example. >> >> scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write. >> partitionBy("foo").json("json-out") >> >> >> $ ls json-out/ >> foo=1 foo=2 _SUCCESS >> $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a2952 >> 3b.json >> {"bar":10} >> $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a2952 >> 3b.json >> {"bar":20} >> >> Thanks, >> Alex. >> > >
"where" clause able to access fields not in its schema
I don't know if this is a bug or a feature, but it's a bit counter-intuitive when reading code. The "b" dataframe does not have field "bar" in its schema, but is still able to filter on that field. scala> val a = sc.parallelize(Seq((1,10),(2,20))).toDF("foo","bar") a: org.apache.spark.sql.DataFrame = [foo: int, bar: int] scala> a.show +---+---+ |foo|bar| +---+---+ | 1| 10| | 2| 20| +---+---+ scala> val b = a.select($"foo") b: org.apache.spark.sql.DataFrame = [foo: int] scala> b.schema res3: org.apache.spark.sql.types.StructType = StructType(StructField(foo,IntegerType,false)) scala> b.select($"bar").show org.apache.spark.sql.AnalysisException: cannot resolve '`bar`' given input columns: [foo];; [...snip...] scala> b.where($"bar" === 20).show +---+ |foo| +---+ | 2| +---+
to_avro/from_avro inserts extra values from Kafka
Hi all, I create a dataframe, convert it to Avro with to_avro and write it to Kafka. Then I read it back out with from_avro. (Not using Schema Registry.) The problem is that the values skip every other field in the result. I expect: +-++-+---+ |firstName|lastName|color| mood| +-++-+---+ | Suzy| Samson | indigo | grim | | Jim| Johnson | blue | grimmer | +-++-+---+ Instead I get: +-++-+---+ |firstName|lastName|color| mood| +-++-+---+ | |Suzy| | Samson| | | Jim| |Johnson| +-++-+---+ Here's what I'm doing -- $ kt admin -createtopic persons-avro-spark9 -topicdetail <(jsonify =NumPartitions 1 =ReplicationFactor 1) $ cat person.avsc { "type": "record", "name": "Person", "namespace": "com.ippontech.kafkatutorials", "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "color", "type": "string" }, { "name": "mood", "type": "string" } ] $ spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 scala> :paste // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.avro._ import java.nio.file.Files; import java.nio.file.Paths; val topic = "persons-avro-spark9" // `from_avro` requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("person.avsc"))) val personDF = sc.parallelize(Seq( ("Jim","Johnson","indigo","grim"), ("Suzy","Samson","blue","grimmer") )).toDF("firstName","lastName","color","mood") personDF.select(to_avro(struct(personDF.columns.map(column):_*)).alias("value")) .write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic",topic) .option("avroSchema",jsonFormatSchema) .save() val stream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", topic) .option("startingOffsets", "earliest") .load() .select(from_avro('value, jsonFormatSchema) as 'person) .select($"person.firstName",$"person.lastName",$"person.color",$"person.mood") .writeStream .format("console") .start() // Exiting paste mode, now interpreting. import org.apache.spark.sql.avro._ import java.nio.file.Files import java.nio.file.Paths topic: String = persons-avro-spark9 jsonFormatSchema: String = { "type": "record", "name": "Person", "namespace": "com.ippontech.kafkatutorials", "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "color", "type": "string" }, { "name": "mood", "type": "string" } ] } personDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName: string ... 2 more fields] stream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3990c36c scala> --- Batch: 0 --- +-++-+---+ |firstName|lastName|color| mood| +-++-+---+ | |Suzy| | Samson| | | Jim| |Johnson| +-++-+---+ See the raw bytes: $ kt consume -topic persons-avro-spark9 { "partition": 0, "offset": 0, "key": null, "value": "\u\u0008Suzy\u\u000cSamson\u\u0008blue\u\u000egrimmer", "timestamp": "2020-05-12T17:18:53.858-04:00" } { "partition": 0, "offset": 1, "key": null, "value": "\u\u0006Jim\u\u000eJohnson\u\u000cindigo\u\u0008grim", "timestamp": "2020-05-12T17:18:53.859-04:00" } Thanks, Alex.