Re: Which parts of a parquet read happen on the driver vs the executor?

2019-04-11 Thread Sean Owen
Spark is a distributed compute framework of course, so things you do
with Spark operations like map, filter, groupBy, etc do not happen on
the driver. The function is serialized to the executors. The error
here just indicates you are making some function that references
things that can't be serialized.

I'm not quite clear from your code what you're doing here, but it's
not using Spark operations to read Parquet. I actually don't see it
invoking Spark end-user APIs at all? You're using some Spark internals
directly and so that's kind of executing driver-side, but this isn't
how you'd use Spark to read Parquet. Whatever this is, you could only
execute it driver-side if it's using the SparkSession.

On Thu, Apr 11, 2019 at 3:01 PM Long, Andrew
 wrote:
>
> Hey Friends,
>
>
>
> I’m working on a POC that involves reading and writing parquet files mid dag. 
>  Writes are working but I’m struggling with getting reads working due to 
> serialization issues. I’ve got code that works in master=local but not in 
> yarn.  So here are my questions.
>
>
>
> Is there an easy way to tell if a particular function in spark will be run on 
> the driver or the executor?  My current system is that if the function uses 
> the spark session it runs on the driver but….
> Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver or 
> the executor?  Dyue to the spark session I was suspecting that it was run on 
> the driver and then the resulting iterator was sent to the executor to run 
> the read but I’ve been running into serialization issues.
>
>
>
> 19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class 
> TaskSetManager: Failed to serialize task 26, not attempting to retry it.
>
> java.io.NotSerializableException: scala.collection.Iterator$$anon$12
>
> Serialization stack:
>
> - object not serializable (class: 
> scala.collection.Iterator$$anon$12, value: non-empty iterator)
>
> - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy)
>
> - object (class 
> scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@6993864a)
>
> - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy)
>
> - object (class scala.collection.immutable.$colon$colon, 
> List(non-empty iterator))
>
> - field (class: 
> com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: 
> class scala.collection.immutable.List)
>
>
>
> Is there something I’m missing here?
>
>
>
> Here’s the code I’m using to read records.
>
>
>
> def read(path: 
> String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
>  = {
>   val partitionSchema = StructType(Seq.empty)
>   val status = spark.fs.getFileStatus(path)
>
>   val pFile = new PartitionedFile(
> partitionValues = InternalRow.empty,//This should be empty for non 
> partitioned values
> filePath = path.toString,
> start = 0,
> length = status.getLen
>   )
>
>   val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
> new ParquetFileFormat().buildReaderWithPartitionValues(
>   sparkSession = spark,
>   dataSchema = fileSchema,
>   partitionSchema = partitionSchema,//this should be empty for non 
> partitioned fields
>   requiredSchema = requiredSchema,
>   filters = Seq.empty,
>   options = Map.empty,
>   hadoopConf = 
> spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> )
>
>   import scala.collection.JavaConverters._
>
>   val i: Iterator[Any] = readFile(pFile)
>   val rows = i.flatMap(_ match {
> case r: InternalRow => Seq(r)
> case b: ColumnarBatch => b.rowIterator().asScala
>   })
>
>   rows
> }
>
>
>
>
>
> Cheers Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Which parts of a parquet read happen on the driver vs the executor?

2019-04-11 Thread Long, Andrew
Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  
Writes are working but I’m struggling with getting reads working due to 
serialization issues. I’ve got code that works in master=local but not in yarn. 
 So here are my questions.


  1.  Is there an easy way to tell if a particular function in spark will be 
run on the driver or the executor?  My current system is that if the function 
uses the spark session it runs on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver 
or the executor?  Dyue to the spark session I was suspecting that it was run on 
the driver and then the resulting iterator was sent to the executor to run the 
read but I’ve been running into serialization issues.

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: 
Failed to serialize task 26, not attempting to retry it.
java.io.NotSerializableException: scala.collection.Iterator$$anon$12
Serialization stack:
- object not serializable (class: 
scala.collection.Iterator$$anon$12, value: non-empty iterator)
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class 
scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@6993864a)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(non-empty iterator))
- field (class: 
com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class 
scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: 
String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
 = {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
  )

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
new ParquetFileFormat().buildReaderWithPartitionValues(
  sparkSession = spark,
  dataSchema = fileSchema,
  partitionSchema = partitionSchema,//this should be empty for non 
partitioned fields
  requiredSchema = requiredSchema,
  filters = Seq.empty,
  options = Map.empty,
  hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
  })

  rows
}


Cheers Andrew


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Ryan Blue
I think the confusion is that the schema passed to spark.read is not a
projection schema. I don’t think it is even used in this case because the
Parquet dataset has its own schema. You’re getting the schema of the table.
I think the correct behavior is to reject a user-specified schema in this
case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins 
wrote:

> I see a Jira:
>
> https://issues.apache.org/jira/browse/SPARK-21021
>
> On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
> wrote:
>
>> +dev for more visibility. Is this a known issue? Is there a plan for a
>> fix?
>>
>> Thanks,
>> David
>>
>> Begin forwarded message:
>>
>> *From: *Dávid Szakállas 
>> *Subject: **Dataset schema incompatibility bug when reading column
>> partitioned data*
>> *Date: *2019. March 29. 14:15:27 CET
>> *To: *u...@spark.apache.org
>>
>> We observed the following bug on Spark 2.4.0:
>>
>> scala> 
>> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>>
>> scala> val schema = StructType(Seq(StructField("_1", 
>> IntegerType),StructField("_2", IntegerType)))
>>
>> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
>> +---+---+
>> | _2| _1|
>> +---+---+
>> |  2|  1|
>> +---+- --+
>>
>>
>> That is, when reading column partitioned Parquet files the explicitly
>> specified schema is not adhered to, instead the partitioning columns are
>> appended the end of the column list. This is a quite severe issue as some
>> operations, such as union, fails if columns are in a different order in two
>> datasets. Thus we have to work around the issue with a select:
>>
>> val columnNames = schema.fields.map(_.name)
>> ds.select(columnNames.head, columnNames.tail: _*)
>>
>>
>> Thanks,
>> David Szakallas
>> Data Engineer | Whitepages, Inc.
>>
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Bruce Robbins
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
wrote:

> +dev for more visibility. Is this a known issue? Is there a plan for a fix?
>
> Thanks,
> David
>
> Begin forwarded message:
>
> *From: *Dávid Szakállas 
> *Subject: **Dataset schema incompatibility bug when reading column
> partitioned data*
> *Date: *2019. March 29. 14:15:27 CET
> *To: *u...@spark.apache.org
>
> We observed the following bug on Spark 2.4.0:
>
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
>
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
>
>
> That is, when reading column partitioned Parquet files the explicitly
> specified schema is not adhered to, instead the partitioning columns are
> appended the end of the column list. This is a quite severe issue as some
> operations, such as union, fails if columns are in a different order in two
> datasets. Thus we have to work around the issue with a select:
>
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
>
>
> Thanks,
> David Szakallas
> Data Engineer | Whitepages, Inc.
>
>
>


Re: [DISCUSS] Spark Columnar Processing

2019-04-11 Thread Reynold Xin
I just realized we had an earlier SPIP on a similar topic: 
https://issues.apache.org/jira/browse/SPARK-24579

Perhaps we should tie the two together. IIUC, you'd want to expose the existing 
ColumnBatch API, but also provide utilities to directly convert from/to Arrow.

On Thu, Apr 11, 2019 at 7:13 AM, Bobby Evans < bo...@apache.org > wrote:

> 
> The SPIP has been up for almost 6 days now with really no discussion on
> it.  I am hopeful that means it's okay and we are good to call a vote on
> it, but I want to give everyone one last chance to take a look and
> comment.  If there are no comments by tomorrow I this we will start a vote
> for this.
> 
> 
> Thanks,
> 
> 
> Bobby
> 
> On Fri, Apr 5, 2019 at 2:24 PM Bobby Evans < bobby@ apache. org (
> bo...@apache.org ) > wrote:
> 
> 
>> I just filed SPARK-27396 as the SPIP for this proposal.  Please use that
>> JIRA for further discussions.
>> 
>> 
>> Thanks for all of the feedback,
>> 
>> 
>> Bobby
>> 
>> On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans < bobby@ apache. org (
>> bo...@apache.org ) > wrote:
>> 
>> 
>>> I am still working on the SPIP and should get it up in the next few days. 
>>> I have the basic text more or less ready, but I want to get a high-level
>>> API concept ready too just to have something more concrete.  I have not
>>> really done much with contributing new features to spark so I am not sure
>>> where a design document really fits in here because from http:/ / spark. 
>>> apache.
>>> org/ improvement-proposals. html (
>>> http://spark.apache.org/improvement-proposals.html ) and http:/ / spark. 
>>> apache.
>>> org/ contributing. html ( http://spark.apache.org/contributing.html ) it
>>> does not mention a design anywhere.  I am happy to put one up, but I was
>>> hoping the API concept would cover most of that.
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> Bobby
>>> 
>>> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu < liurenjie2008@ gmail. com (
>>> liurenjie2...@gmail.com ) > wrote:
>>> 
>>> 
 Hi, Bobby:
 Do you have design doc? I'm also interested in this topic and want to help
 contribute.
 
 On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans < bobby@ apache. org (
 bo...@apache.org ) > wrote:
 
 
> Thanks to everyone for the feedback.
> 
> 
> Overall the feedback has been really positive for exposing columnar as a
> processing option to users.  I'll write up a SPIP on the proposed changes
> to support columnar processing (not necessarily implement it) and then
> ping the list again for more feedback and discussion.
> 
> 
> Thanks again,
> 
> 
> Bobby
> 
> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin < rxin@ databricks. com (
> r...@databricks.com ) > wrote:
> 
> 
>> I just realized I didn't make it very clear my stance here ... here's
>> another try:
>> 
>> 
>> 
>> I think it's a no brainer to have a good columnar UDF interface. This
>> would facilitate a lot of high performance applications, e.g. GPU-based
>> accelerations for machine learning algorithms.
>> 
>> 
>> 
>> On rewriting the entire internals of Spark SQL to leverage columnar
>> processing, I don't see enough evidence to suggest that's a good idea 
>> yet.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bobby@ apache. org (
>> bo...@apache.org ) > wrote:
>> 
>>> Kazuaki Ishizaki,
>>> 
>>> 
>>> Yes, ColumnarBatchScan does provide a framework for doing code 
>>> generation
>>> for the processing of columnar data.  I have to admit that I don't have 
>>> a
>>> deep understanding of the code generation piece, so if I get something
>>> wrong please correct me.  From what I had seen only input formats
>>> currently inherent from ColumnarBatchScan, and from comments in the 
>>> trait
>>> 
>>> 
>>>   /**
>>>    * Generate [[ColumnVector]] expressions for our parent to consume as
>>> rows.
>>>    * This is called once per [[ColumnarBatch]].
>>>    */
>>> https:/ / github. com/ apache/ spark/ blob/ 
>>> 956b52b1670985a67e49b938ac1499ae65c79f6e/
>>> sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/ 
>>> ColumnarBatchScan.
>>> scala#L42-L43 (
>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>> )
>>> 
>>> 
>>> 
>>> It appears that ColumnarBatchScan is really only intended to pull out 
>>> the
>>> data from the batch, and not to process that data in a columnar 
>>> fashion. 
>>> The Loading stage that you mentioned.
>>> 
>>> 
>>> > The SIMDzation or GPUization capability depends on a compiler that
>>> translates native code from the code generated by the whole-stage 
>>> codegen.
>>> 

Fwd: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Dávid Szakállas
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

> Begin forwarded message:
> 
> From: Dávid Szakállas 
> Subject: Dataset schema incompatibility bug when reading column partitioned 
> data
> Date: 2019. March 29. 14:15:27 CET
> To: u...@spark.apache.org
> 
> We observed the following bug on Spark 2.4.0:
> 
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
> 
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
> 
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
> 
> That is, when reading column partitioned Parquet files the explicitly 
> specified schema is not adhered to, instead the partitioning columns are 
> appended the end of the column list. This is a quite severe issue as some 
> operations, such as union, fails if columns are in a different order in two 
> datasets. Thus we have to work around the issue with a select:
> 
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
> 
> 
> Thanks, 
> David Szakallas
> Data Engineer | Whitepages, Inc.



Re: [DISCUSS] Spark Columnar Processing

2019-04-11 Thread Bobby Evans
The SPIP has been up for almost 6 days now with really no discussion on
it.  I am hopeful that means it's okay and we are good to call a vote on
it, but I want to give everyone one last chance to take a look and
comment.  If there are no comments by tomorrow I this we will start a vote
for this.

Thanks,

Bobby

On Fri, Apr 5, 2019 at 2:24 PM Bobby Evans  wrote:

> I just filed SPARK-27396 as the SPIP for this proposal.  Please use that
> JIRA for further discussions.
>
> Thanks for all of the feedback,
>
> Bobby
>
> On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans  wrote:
>
>> I am still working on the SPIP and should get it up in the next few
>> days.  I have the basic text more or less ready, but I want to get a
>> high-level API concept ready too just to have something more concrete.  I
>> have not really done much with contributing new features to spark so I am
>> not sure where a design document really fits in here because from
>> http://spark.apache.org/improvement-proposals.html and
>> http://spark.apache.org/contributing.html it does not mention a design
>> anywhere.  I am happy to put one up, but I was hoping the API concept would
>> cover most of that.
>>
>> Thanks,
>>
>> Bobby
>>
>> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu 
>> wrote:
>>
>>> Hi, Bobby:
>>> Do you have design doc? I'm also interested in this topic and want to
>>> help contribute.
>>>
>>> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans  wrote:
>>>
 Thanks to everyone for the feedback.

 Overall the feedback has been really positive for exposing columnar as
 a processing option to users.  I'll write up a SPIP on the proposed changes
 to support columnar processing (not necessarily implement it) and then ping
 the list again for more feedback and discussion.

 Thanks again,

 Bobby

 On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin  wrote:

> I just realized I didn't make it very clear my stance here ... here's
> another try:
>
> I think it's a no brainer to have a good columnar UDF interface. This
> would facilitate a lot of high performance applications, e.g. GPU-based
> accelerations for machine learning algorithms.
>
> On rewriting the entire internals of Spark SQL to leverage columnar
> processing, I don't see enough evidence to suggest that's a good idea yet.
>
>
>
>
> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans  wrote:
>
>> Kazuaki Ishizaki,
>>
>> Yes, ColumnarBatchScan does provide a framework for doing code
>> generation for the processing of columnar data.  I have to admit that I
>> don't have a deep understanding of the code generation piece, so if I get
>> something wrong please correct me.  From what I had seen only input 
>> formats
>> currently inherent from ColumnarBatchScan, and from comments in the trait
>>
>>   /**
>>* Generate [[ColumnVector]] expressions for our parent to consume
>> as rows.
>>* This is called once per [[ColumnarBatch]].
>>*/
>>
>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>
>> It appears that ColumnarBatchScan is really only intended to pull out
>> the data from the batch, and not to process that data in a columnar
>> fashion.  The Loading stage that you mentioned.
>>
>> > The SIMDzation or GPUization capability depends on a compiler that
>> translates native code from the code generated by the whole-stage 
>> codegen.
>> To be able to support vectorized processing Hive stayed with pure
>> java and let the JVM detect and do the SIMDzation of the code.  To make
>> that happen they created loops to go through each element in a column and
>> remove all conditionals from the body of the loops.  To the best of my
>> knowledge that would still require a separate code path like I am 
>> proposing
>> to make the different processing phases generate code that the JVM can
>> compile down to SIMD instructions.  The generated code is full of null
>> checks for each element which would prevent the operations we want.  
>> Also,
>> the intermediate results are often stored in UnsafeRow instances.  This 
>> is
>> really fast for row-based processing, but the complexity of how they 
>> work I
>> believe would prevent the JVM from being able to vectorize the 
>> processing.
>> If you have a better way to take java code and vectorize it we should put
>> it into OpenJDK instead of spark so everyone can benefit from it.
>>
>> Trying to compile directly from generated java code to something a
>> GPU can process is something we are tackling but we decided to go a
>> different route from what you proposed.  From talking with several 
>> compiler
>> experts here at NVIDIA my understanding is that 

Raise Jenkins test timeout? with alternatives

2019-04-11 Thread Sean Owen
I have a big PR that keeps failing because it his the 300 minute build timeout:

https://github.com/apache/spark/pull/24314
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4703/console

It's because it touches so much code that all tests run including
things like Kinesis. It looks like 300 mins isn't enough. We can raise
it to an eye-watering 360 minutes if that's just how long all tests
take.

I can also try splitting up the change to move out changes to a few
optional modules into separate PRs.

(Because this one makes it all the way through Python and Java tests
and almost all R tests several times, and doesn't touch Python or R
and shouldn't have any functional changes, I'm tempted to just merge
it, too, as a solution)

Thoughts?

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org