Re: Enabling push-based shuffle in Spark

2020-01-27 Thread Long, Andrew
The easiest would be to create a fork of the code in github.   I can also 
accept diffs.

Cheers Andrew

From: Min Shen 
Date: Monday, January 27, 2020 at 12:48 PM
To: "Long, Andrew" , "dev@spark.apache.org" 

Subject: Re: Enabling push-based shuffle in Spark

Hi Andrew,

We are leveraging SPARK-6237 to control the off-heap memory consumption due to 
Netty.
With that change, the data is processed in a streaming fashion so Netty does 
not buffer an entire RPC in memory before handing it over to RPCHandler.
We tested with our internal stress testing framework, and did not see much 
change in the memory consumption of the shuffle service.
In terms of sharing the code, not sure what would be an effective way to do 
that.
If interested, maybe we can call a meeting to chat in more depth.

Best,
Min

On Mon, Jan 27, 2020 at 11:30 AM Long, Andrew 
mailto:loand...@amazon.com>> wrote:
Hey Min,

One thing of concern would be off heap memory utilization due to netty.  
Depending on the number of connections that you create.

Would it be possible to take a look at your code?  My team has a performance 
test harness that I'd like to test it with.

Cheers Andrew



On 1/23/20, 10:25 AM, "mshen" mailto:ms...@apache.org>> wrote:

Hi Wenchen,

Glad to know that you like this idea.
We also looked into making this pluggable in our early design phase.
While the ShuffleManager API for pluggable shuffle systems does provide
quite some room for customized behaviors for Spark shuffle, we feel that it
is still not enough for this case.

Right now, the shuffle block location information is tracked inside
MapOutputTracker and updated by DAGScheduler.
Since we are relocating the shuffle blocks to improve overall shuffle
throughput and efficiency, being able to update the information tracked
inside MapOutputTracker so reducers can access their shuffle input more
efficiently is thus necessary.
Letting DAGScheduler orchestrate this process also provides the benefit of
better coping with stragglers.
If DAGScheduler has no control or is agnostic of the block push progress, it
does leave a few gaps.

On the shuffle Netty protocol side, there are a lot that can be leveraged
from the existing code.
With improvements in SPARK-24355 and SPARK-30512, the shuffle service Netty
server is becoming much more reliable.
The work in SPARK-6237 also provided quite some leverage for streaming push
of shuffle blocks.
Instead of building all of these from scratch, we took the alternative route
of building on top of the existing Netty protocol to implement the shuffle
block push operation.

We feel that this design has the potential of further improving Spark
shuffle system's scalability and efficiency, making Spark an even better
compute engine.
Would like to explore how we can leverage the shuffle plugin API to make
this design more acceptable.



-
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




Re: How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Long, Andrew
Hey Bing,

There’s a couple different approaches you could take.  The quickest and easiest 
would be to use the existing APIs

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
  //W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper 
class in the source
  val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

The more complicated but pretty approach would be to either implement a custom 
datasource.

From: "Duan,Bing" 
Date: Thursday, January 16, 2020 at 12:35 AM
To: "dev@spark.apache.org" 
Subject: How to implement a "saveAsBinaryFile" function?

Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to 
a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by 
saveAsTextFile, the quotation mark was be escaped like this:
"\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should be 
"20192_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to 
persist the RDD[Array[Byte]]?

Bests!

Bing


Re: SortMergeJoinExec: Utilizing child partitioning when joining

2020-01-07 Thread Long, Andrew
“Where can I find information on how to run standard performance 
tests/benchmarks?“

The grand standard is spark-sql-perf and in particular the tpc-ds benchmark. 
Most of the big optimization teams are using this as the primary benchmark.  
One word of warning is that most groups have also extended this to add entirely 
new types of benchmarks which are not in open source but the core tpc-ds 
benchmark will get you most of the way there.

https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/
https://github.com/databricks/spark-sql-perf
http://www.tpc.org/tpcds/

“Are performance degradations to existing queries that are fixable by new 
equivalent queries not allowed for a new major spark version”

The general rule of thumb for my group (which is NOT databricks) is, as long as 
the geomean of tpcds increases you’re fine as long as you don’t break any 
existing queries.  For example regressing a couple queries by 5% is fine BUT 
causing a query that would have previously run to crash is not ok. Additionally 
we have a sample of user queries +etl processes that we try not to break either.

Cheers Andrew

From: Brett Marcott 
Date: Tuesday, January 7, 2020 at 12:00 AM
To: "Long, Andrew" 
Cc: "dev@spark.apache.org" 
Subject: Re: SortMergeJoinExec: Utilizing child partitioning when joining

1. Where can I find information on how to run standard performance 
tests/benchmarks?
2. Are performance degradations to existing queries that are fixable by new 
equivalent queries not allowed for a new major spark version?

On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott 
mailto:brett.marc...@gmail.com>> wrote:
Thanks for the response Andrew.

1. The approach
The approach I mentioned will not introduce any new skew, so it should only be 
worsen performance if the user was relying on the shuffle to fix skew they had 
before.
The user can address this by either not introducing their own skewed partition 
in the first place, or repartitioning with less skew again before the join.
Today the user cannot change partitioning without changing the join condition 
in a hacky way:joinkey1 >= joinkey2 && joinkey1 <= joinkey2

The condition I mentioned below ensures that the same keys on left and right 
formed their respective subsets:
  left and right partition expressions have the same subset (with regards 
to indices) of their respective join keys

I don't believe EnsureRequirements will require any changes, just what the 
Exec's are saying is required.

2. output partitionings
Yea I got as far as you mentioned, but I didn't at first get why for outer 
joins only one side is used.
Now however, I think it makes sense because for outer joins you may be 
introducing nulls for at least one side, which makes that sides partitioning 
invalid right?

Warn New Year Regards,
Brett

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew 
mailto:loand...@amazon.com>> wrote:
“Thoughts on this approach?“

Just to warn you this is a hazardous optimization without cardinality 
information. Removing columns from the hash exchange reduces entropy 
potentially resulting in skew. Also keep in mind that if you reduce the number 
of columns on one side of the join you need todo it on the other. This will 
require you to rewrite EnsureRequirements or add a special case to detect this.

As a word of warning there’s a whole bunch of subtle things that 
EnsureRequirements is doing and its really easy to unintentionally create 
performance regressions while making improvements in other areas.

“Could someone help explain why the different join types have different output 
partitionings“

Long story short when a join happens the join exec zips together the partitions 
of the left and right side so that one partition of the join has the elements 
of the left and right.  In the case of an inner join this means that that the 
resulting RDD is now partitioned by both the left join keys and the right join 
keys.  I’d suggest taking a look at the join execs and take a look at how they 
build the result RDD from the partitions of the left and right RDDs.(see 
doExecute(…))  left/right outer does look surprising though.

You should see something like…

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>


Cheers Andrew

From: Brett Marcott mailto:brett.marc...@gmail.com>>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

Hi all,

I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771

My initial idea for a fix is to change SortMergeJoinExec's (and 
ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of 
keys for partitioning:
left and right children's output partiti

Re: SortMergeJoinExec: Utilizing child partitioning when joining

2020-01-02 Thread Long, Andrew
“Thoughts on this approach?“

Just to warn you this is a hazardous optimization without cardinality 
information. Removing columns from the hash exchange reduces entropy 
potentially resulting in skew. Also keep in mind that if you reduce the number 
of columns on one side of the join you need todo it on the other. This will 
require you to rewrite EnsureRequirements or add a special case to detect this.

As a word of warning there’s a whole bunch of subtle things that 
EnsureRequirements is doing and its really easy to unintentionally create 
performance regressions while making improvements in other areas.

“Could someone help explain why the different join types have different output 
partitionings“

Long story short when a join happens the join exec zips together the partitions 
of the left and right side so that one partition of the join has the elements 
of the left and right.  In the case of an inner join this means that that the 
resulting RDD is now partitioned by both the left join keys and the right join 
keys.  I’d suggest taking a look at the join execs and take a look at how they 
build the result RDD from the partitions of the left and right RDDs.(see 
doExecute(…))  left/right outer does look surprising though.

You should see something like…

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>


Cheers Andrew

From: Brett Marcott 
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "dev@spark.apache.org" 
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

Hi all,

I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771

My initial idea for a fix is to change SortMergeJoinExec's (and 
ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of 
keys for partitioning:
left and right children's output partitionings are hashpartitioning with same 
numpartitions
left and right partition expressions have the same subset (with regards to 
indices) of their respective join keys

If that subset of keys is returned by requiredChildDistribution, then 
EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, 
hence reusing the children's partitioning.

1.Thoughts on this approach?

2. Could someone help explain why the different join types have different 
output partitionings in 
SortMergeJoinExec.outputPartitioning?

Thanks,
Brett




CR for adding bucket join support to V2 Datasources

2019-11-18 Thread Long, Andrew
Hey Friends,

I recently created a pull request to add an optional support for bucket joins 
to V2 Datasources, via a concrete class representing the Spark Style ash 
Partitioning. If anyone has some free time Id appreciate a code review.  This 
also adds a concrete implementation of V2 ClusteredDistribution to make 
specifying Clustered Distributionseasier.

https://github.com/apache/spark/pull/26511

Cheers Andrew



Timeline for Spark 3.0

2019-06-28 Thread Long, Andrew
Hey Friends,

Is there a timeline for spark 3.0 in terms of the first RC and final release?

Cheers Andrew


Bucketing and catalyst

2019-05-02 Thread Long, Andrew
Hey Friends,

How aware of bucketing is Catalyst? I’ve been trying to piece together how 
Catalyst knows that it can remove a sort and shuffle given that both tables are 
bucketed and sorted the same way. Is there any classes in particular I should 
look at?

Cheers Andrew


Re: Stage 152 contains a task of very large size (12747 KB). The maximum recommended task size is 100 KB

2019-05-01 Thread Long, Andrew
It turned out that I was unintentionally copying multiple copies of the Hadoop 
config to every partition in an rdd. >.<  I was able to debug this by setting a 
break point on the warning message and inspecting the partition object itself.

Cheers Andrew

From: Russell Spitzer 
Date: Thursday, April 25, 2019 at 8:47 AM
To: "Long, Andrew" 
Cc: dev 
Subject: Re: FW: Stage 152 contains a task of very large size (12747 KB). The 
maximum recommended task size is 100 KB

I usually only see that in regards to folks parallelizing very large objects. 
From what I know, it's really just the data inside the "Partition" class of the 
RDD that is being sent back and forth. So usually something like 
spark.parallelize(Seq(reallyBigMap)) or something like that. The parallelize 
function jams all that data into the RDD's Partition metadata so that can 
easily overwhelm the task size.

On Tue, Apr 23, 2019 at 3:57 PM Long, Andrew  
wrote:
Hey Friends,

Is there an easy way of figuring out whats being pull into the task context?  
I’ve been getting the following message which I suspect means I’ve 
unintentional caught some large objects but figuring out what those objects are 
is stumping me.

19/04/23 13:52:13 WARN org.apache.spark.internal.Logging$class TaskSetManager: 
Stage 152 contains a task of very large size (12747 KB). The maximum 
recommended task size is 100 KB

Cheers Andrew


FW: Stage 152 contains a task of very large size (12747 KB). The maximum recommended task size is 100 KB

2019-04-23 Thread Long, Andrew
Hey Friends,

Is there an easy way of figuring out whats being pull into the task context?  
I’ve been getting the following message which I suspect means I’ve 
unintentional caught some large objects but figuring out what those objects are 
is stumping me.

19/04/23 13:52:13 WARN org.apache.spark.internal.Logging$class TaskSetManager: 
Stage 152 contains a task of very large size (12747 KB). The maximum 
recommended task size is 100 KB

Cheers Andrew


Sort order in bucketing in a custom datasource

2019-04-16 Thread Long, Andrew
Hey Friends,

Is it possible to specify the sort order or bucketing in a way that can be used 
by the optimizer in spark?

Cheers Andrew


Which parts of a parquet read happen on the driver vs the executor?

2019-04-11 Thread Long, Andrew
Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  
Writes are working but I’m struggling with getting reads working due to 
serialization issues. I’ve got code that works in master=local but not in yarn. 
 So here are my questions.


  1.  Is there an easy way to tell if a particular function in spark will be 
run on the driver or the executor?  My current system is that if the function 
uses the spark session it runs on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver 
or the executor?  Dyue to the spark session I was suspecting that it was run on 
the driver and then the resulting iterator was sent to the executor to run the 
read but I’ve been running into serialization issues.

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: 
Failed to serialize task 26, not attempting to retry it.
java.io.NotSerializableException: scala.collection.Iterator$$anon$12
Serialization stack:
- object not serializable (class: 
scala.collection.Iterator$$anon$12, value: non-empty iterator)
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class 
scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@6993864a)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(non-empty iterator))
- field (class: 
com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class 
scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: 
String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
 = {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
  )

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
new ParquetFileFormat().buildReaderWithPartitionValues(
  sparkSession = spark,
  dataSchema = fileSchema,
  partitionSchema = partitionSchema,//this should be empty for non 
partitioned fields
  requiredSchema = requiredSchema,
  filters = Seq.empty,
  options = Map.empty,
  hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
  })

  rows
}


Cheers Andrew


Re: Manually reading parquet files.

2019-03-21 Thread Long, Andrew
Thanks a ton for the help!

Is there a standardized way of converting the internal row to rows?

I’ve tried this but im getting an exception

val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)
  case b: ColumnarBatch => b.rowIterator().asScala
})
  .map(enconder.fromRow(_))
  .toList

java.lang.RuntimeException: Error while decoding: 
java.lang.UnsupportedOperationException: Cannot evaluate expression: 
getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, 
IntegerType), getcolumnbyordinal(2, StringType).toString, 
StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), 
StructField(col_a,StringType,true))

at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" 
Cc: "dev@spark.apache.org" , "u...@spark.apache.org" 
, "horizon-...@amazon.com" 
Subject: Re: Manually reading parquet files.

You're getting InternalRow instances. They probably have the data you want, but 
the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew  
wrote:
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in 
the middle of a lambda and I’m running into some issues.  This is what id like 
todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non 
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,24,66647361])
//??this is wrong I think

Has anyone attempted something similar?

Cheers Andrew



--
Ryan Blue
Software Engineer
Netflix


Manually reading parquet files.

2019-03-21 Thread Long, Andrew
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in 
the middle of a lambda and I’m running into some issues.  This is what id like 
todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non 
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,24,66647361])
//??this is wrong I think

Has anyone attempted something similar?

Cheers Andrew



Re: Spark data quality bug when reading parquet files from hive metastore

2018-09-07 Thread Long, Andrew
Thanks Fokko,

I will definitely take a look at this.

Cheers Andrew

From: "Driesprong, Fokko" 
Date: Friday, August 24, 2018 at 2:39 AM
To: "reubensaw...@hotmail.com" 
Cc: "dev@spark.apache.org" 
Subject: Re: Spark data quality bug when reading parquet files from hive 
metastore

Hi Andrew,

This blog gives an idea how to schema is resolved: 
https://blog.godatadriven.com/multiformat-spark-partition There is some 
optimisation going on when reading Parquet using Spark. Hope this helps.

Cheers, Fokko


Op wo 22 aug. 2018 om 23:59 schreef t4 
mailto:reubensaw...@hotmail.com>>:
https://issues.apache.org/jira/browse/SPARK-23576 ?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org


Spark data quality bug when reading parquet files from hive metastore

2018-08-22 Thread Long, Andrew
Hello Friends,

I’ve encountered a bug where spark silently corrupts data when reading from a 
parquet hive table where the table schema does not match the file schema.  I’d 
like to give a shot at adding some extra validations to the code to handle this 
corner case and I was wondering if anyone had any suggestions for where to 
start looking in the spark code.

Cheers Andrew


Feedback on first commit + jira issue I opened

2018-05-31 Thread Long, Andrew
Hello Friends,

I’m a new committer and I’ve submitted my first patch and I had some questions 
about documentation standards.  In my patch(jira below)  I’ve added a config 
parameter to adjust the number of records show when a user calls .show() on a 
dataframe.  I was hoping someone could double check my small diff to make sure 
I wasn’t making any rookie mistakes before I submit a pull request.

https://issues.apache.org/jira/browse/SPARK-24442

Cheers Andrew