Spark performance tests

2017-01-09 Thread Prasun Ratn
Hi

Are there performance tests or microbenchmarks for Spark - especially
directed towards the CPU specific parts? I looked at spark-perf but
that doesn't seem to have been updated recently.

Thanks
Prasun

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Liang-Chi Hsieh

Hi Andy,

Because hash-based aggregate uses unsafe row as aggregation states, so the
aggregation buffer schema must be mutable types in unsafe row.

If you can use TypedImperativeAggregate to implement your aggregation
function, SparkSQL has ObjectHashAggregateExec which supports hash-based
aggregate using arbitrary JVM objects as aggregation states.



Andy Dang wrote
> Hi Takeshi,
> 
> Thanks for the answer. My UDAF aggregates data into an array of rows.
> 
> Apparently this makes it ineligible to using Hash-based aggregate based on
> the logic at:
> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108
> 
> The list of support data type is VERY limited unfortunately.
> 
> It doesn't make sense to me that data type must be mutable for the UDAF to
> use hash-based aggregate, but I could be missing something here :). I
> could
> achieve hash-based aggregate by turning this query to RDD mode, but that
> is
> counter intuitive IMO.
> 
> ---
> Regards,
> Andy
> 
> On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro 

> linguin.m.s@

> 
> wrote:
> 
>> Hi,
>>
>> Spark always uses hash-based aggregates if the types of aggregated data
>> are supported there;
>> otherwise, spark fails to use hash-based ones, then it uses sort-based
>> ones.
>> See: https://github.com/apache/spark/blob/master/sql/
>> core/src/main/scala/org/apache/spark/sql/execution/
>> aggregate/AggUtils.scala#L38
>>
>> So, I'm not sure about your query though, it seems the types of
>> aggregated
>> data in your query
>> are not supported for hash-based aggregates.
>>
>> // maropu
>>
>>
>>
>> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang 

> namd88@

>  wrote:
>>
>>> Hi all,
>>>
>>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>>> which is very inefficient for certain aggration:
>>>
>>> The code is very simple:
>>> - I have a UDAF
>>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>>
>>> The physical plan I got was:
>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>>> +- Exchange SinglePartition
>>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>>> output=[count#71L])
>>>   +- *Project
>>>  +- Generate explode(internal_col#31), false, false,
>>> [internal_col#42]
>>> +- SortAggregate(key=[key#0],
>>> functions=[aggregatefunction(key#0,
>>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>>+- *Sort [key#0 ASC], false, 0
>>>   +- Exchange hashpartitioning(key#0, 200)
>>>  +- SortAggregate(key=[key#0],
>>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>>> output=[key#0,internal_col#37])
>>> +- *Sort [key#0 ASC], false, 0
>>>+- Scan ExistingRDD[key#0,nested#1,nes
>>> tedArray#2,nestedObjectArray#3,value#4L]
>>>
>>> How can I make Spark to use HashAggregate (like the count(*) expression)
>>> instead of SortAggregate with my UDAF?
>>>
>>> Is it intentional? Is there an issue tracking this?
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-hint-Spark-to-use-HashAggregate-for-UDAF-tp20526p20531.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Parquet patch release

2017-01-09 Thread Cheng Lian
Finished reviewing the list and it LGTM now (left comments in the 
spreadsheet and Ryan already made corresponding changes).


Ryan - Thanks a lot for pushing this and making it happen!

Cheng


On 1/6/17 3:46 PM, Ryan Blue wrote:
Last month, there was interest in a Parquet patch release on PR #16281 
. I went ahead and 
reviewed commits that should go into a Parquet patch release and 
started a 1.8.2 discussion 
 
on the Parquet dev list. If you're interested in reviewing what goes 
into 1.8.2 or have suggestions, please follow that thread on the 
Parquet list.


Thanks!

rb

--
Ryan Blue
Software Engineer
Netflix




Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

---
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>   +- *Project
>>  +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>+- *Sort [key#0 ASC], false, 0
>>   +- Exchange hashpartitioning(key#0, 200)
>>  +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>> +- *Sort [key#0 ASC], false, 0
>>+- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> ---
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Takeshi Yamamuro
Hi,

Spark always uses hash-based aggregates if the types of aggregated data are
supported there;
otherwise, spark fails to use hash-based ones, then it uses sort-based ones.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38

So, I'm not sure about your query though, it seems the types of aggregated
data in your query
are not supported for hash-based aggregates.

// maropu



On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:

> Hi all,
>
> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
> which is very inefficient for certain aggration:
>
> The code is very simple:
> - I have a UDAF
> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>
> The physical plan I got was:
> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#71L])
>   +- *Project
>  +- Generate explode(internal_col#31), false, false,
> [internal_col#42]
> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>+- *Sort [key#0 ASC], false, 0
>   +- Exchange hashpartitioning(key#0, 200)
>  +- SortAggregate(key=[key#0], 
> functions=[partial_aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37])
> +- *Sort [key#0 ASC], false, 0
>+- Scan ExistingRDD[key#0,nested#1,
> nestedArray#2,nestedObjectArray#3,value#4L]
>
> How can I make Spark to use HashAggregate (like the count(*) expression)
> instead of SortAggregate with my UDAF?
>
> Is it intentional? Is there an issue tracking this?
>
> ---
> Regards,
> Andy
>



-- 
---
Takeshi Yamamuro


How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
  +- *Project
 +- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
   +- *Sort [key#0 ASC], false, 0
  +- Exchange hashpartitioning(key#0, 200)
 +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
   +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

---
Regards,
Andy


Re: A note about MLlib's StandardScaler

2017-01-09 Thread Sean Owen
This could be true if you knew you were just going to scale the input to
StandardScaler and nothing else. It's probably more typical you'd scale
some other data. The current behavior is therefore the sensible default,
because the input is a sample of some unknown larger population.

I think it doesn't matter much except for toy problems, because at any
scale, the difference between 1/n and 1/(n-1) is negligible, and for most
purposes for which the scaler is used, it won't matter anyway (faster
convergence of an optimizer for example). I'm neutral on whether it's worth
complicating the API to do both, therefore.

On Mon, Jan 9, 2017 at 6:50 AM Liang-Chi Hsieh  wrote:

>
> Actually I think it is possibly that an user/developer needs the
> standardized features with population mean and std in some cases. It would
> be better if StandardScaler can offer the option to do that.
>
>
>
> Holden Karau wrote
> > Hi Gilad,
> >
> > Spark uses the sample standard variance inside of the StandardScaler (see
> >
> https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.mllib.feature.StandardScaler
> > ) which I think would explain the results you are seeing you are seeing.
> I
> > believe the scalers are intended to be used on larger sized datasets You
> > can verify this yourself doing the same computation in Python and see the
> > scaling using the sample deviation result in the values you are seeing
> > from
> > Spark.
> >
> > Cheers,
> >
> > Holden :)
> >
> >
> > On Sun, Jan 8, 2017 at 12:06 PM, Gilad Barkan 
>
> > gilad.barkan@
>
> > 
> > wrote:
> >
> >> Hi
> >>
> >> It seems that the output of MLlib's *StandardScaler*(*withMean=*True,
> >> *withStd*=True)are not as expected.
> >>
> >> The above configuration is expected to do the following transformation:
> >>
> >> X -> Y = (X-Mean)/Std  - Eq.1
> >>
> >> This transformation (a.k.a. Standardization) should result in a
> >> "standardized" vector with unit-variance and zero-mean.
> >>
> >> I'll demonstrate my claim using the current documentation example:
> >>
> >> >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0,
> >> 1.9])]>>> dataset = sc.parallelize(vs)>>> standardizer =
> >> StandardScaler(True, True)>>> model = standardizer.fit(dataset)>>>
> result
> >> = model.transform(dataset)>>> for r in result.collect(): print r
> >> DenseVector([-0.7071, 0.7071, -0.7071])DenseVector([0.7071,
> >> -0.7071, 0.7071])
> >>
> >> This result in std = sqrt(1/2) foreach column instead of std=1.
> >>
> >> Applying Standardization transformation on the above 2 vectors result in
> >> the following output
> >>
> >> DenseVector([-1.0, 1.0, -1.0])DenseVector([1.0, -1.0, 1.0])
> >>
> >>
> >> Another example:
> >>
> >> Adding another DenseVector([2.4, 0.8, 3.5]) to the above we get a 3 rows
> >> of DenseVectors:
> >> [DenseVector([-2.0, 2.3, 0.0]), DenseVector([3.8, 0.0, 1.9]),
> >> DenseVector([2.4, 0.8, 3.5])]
> >>
> >> The StandardScaler result the following scaled vectors:
> >> [DenseVector([-1.12339, 1.084829, -1.02731]), DenseVector([0.792982,
> >> -0.88499, 0.057073]), DenseVector([0.330409, 4
> >> -0.19984, 0.970241])
> >>
> >> This result has std=sqrt(2/3)
> >>
> >> Instead it should have resulted other 3 vectors that form std=1 for each
> >> column.
> >>
> >> Adding another vector (4 total) results in 4 scaled vectors that form
> >> std= sqrt(3/4) instead of std=1
> >>
> >> I hope all the examples help to make my point clear.
> >>
> >> I hope I don't miss here something.
> >>
> >> Thank you
> >>
> >> Gilad Barkan
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> > --
> > Cell : 425-233-8271
> > Twitter: https://twitter.com/holdenkarau
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/A-note-about-MLlib-s-StandardScaler-tp20513p20517.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Spark checkpointing

2017-01-09 Thread Steve Loughran

On 7 Jan 2017, at 08:29, Felix Cheung 
> wrote:

Thanks Steve.

As you have pointed out, we have seen some issues related to cloud storage as 
"file system". I'm looking at checkpointing recently. What do you think would 
be the improvement we could make for "non local" (== reliable?) checkpointing?


right now? I wouldn't checkpoint to S3. Azure WASB works, S3: not reliably. 
Checkpoint to HDFS, use distCp to back up S3




From: Steve Loughran >
Sent: Friday, January 6, 2017 9:57:05 AM
To: Ankur Srivastava
Cc: Felix Cheung; u...@spark.apache.org
Subject: Re: Spark GraphFrame ConnectedComponents


On 5 Jan 2017, at 21:10, Ankur Srivastava 
> wrote:

Yes I did try it out and it choses the local file system as my checkpoint 
location starts with s3n://

I am not sure how can I make it load the S3FileSystem.

set fs.default.name to s3n://whatever , or, in spark context, 
spark.hadoop.fs.default.name

However

1. you should really use s3a, if you have the hadoop 2.7 JARs on your classpath.
2. neither s3n or s3a are real filesystems, and certain assumptions that 
checkpointing code tends to make "renames being O(1) atomic calls" do not hold. 
It may be that checkpointing to s3 isn't as robust as you'd like




On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung 
> wrote:
Right, I'd agree, it seems to be only with delete.

Could you by chance run just the delete to see if it fails

FileSystem.get(sc.hadoopConfiguration)
.delete(new Path(somepath), true)

From: Ankur Srivastava 
>
Sent: Thursday, January 5, 2017 10:05:03 AM
To: Felix Cheung
Cc: u...@spark.apache.org

Subject: Re: Spark GraphFrame ConnectedComponents

Yes it works to read the vertices and edges data from S3 location and is also 
able to write the checkpoint files to S3. It only fails when deleting the data 
and that is because it tries to use the default file system. I tried looking up 
how to update the default file system but could not find anything in that 
regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung 
> wrote:
>From the stack it looks to be an error from the explicit call to 
>hadoop.fs.FileSystem.

Is the URL scheme for s3n registered?
Does it work when you try to read from s3 from Spark?

_
From: Ankur Srivastava 
>
Sent: Wednesday, January 4, 2017 9:23 PM
Subject: Re: Spark GraphFrame ConnectedComponents
To: Felix Cheung >
Cc: >



This is the exact trace from the driver logs

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
s3n:///8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3,
 expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:534)
at 
org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:340)
at org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)
at GraphTest.main(GraphTest.java:31) --- Application Class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10

Thanks
Ankur

On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava 
> wrote:
Hi

I am rerunning the pipeline to generate the exact trace, I have below part of 

Re: scala.MatchError: scala.collection.immutable.Range.Inclusive from catalyst.ScalaReflection.serializerFor?

2017-01-09 Thread Holden Karau
If you want to check if it's your modifications or just in mainline, you
can always just checkout mainline or stash your current changes to rebuild
(this is something I do pretty often when I run into bugs I don't think I
would have introduced).

On Mon, Jan 9, 2017 at 1:01 AM Liang-Chi Hsieh  wrote:

>
>
> Hi,
>
>
>
> As Seq(0 to 8) is:
>
>
>
> scala> Seq(0 to 8)
>
> res1: Seq[scala.collection.immutable.Range.Inclusive] = List(Range(0, 1, 2,
>
> 3, 4, 5, 6, 7, 8))
>
>
>
> Do you actually want to create a Dataset of Range? If so, I think currently
>
> ScalaReflection which the encoder relies doesn't support Range.
>
>
>
>
>
>
>
> Jacek Laskowski wrote
>
> > Hi,
>
> >
>
> > Just got this this morning using the fresh build of Spark
>
> > 2.2.0-SNAPSHOT (with a few local modifications):
>
> >
>
> > scala> Seq(0 to 8).toDF
>
> > scala.MatchError: scala.collection.immutable.Range.Inclusive (of class
>
> > scala.reflect.internal.Types$ClassNoArgsTypeRef)
>
> >   at
>
> >
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:520)
>
> >   at
>
> >
> org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:463)
>
> >   at
>
> >
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
>
> >   at
>
> >
> org.apache.spark.sql.SQLImplicits.newIntSequenceEncoder(SQLImplicits.scala:168)
>
> >   ... 48 elided
>
> >
>
> > Is this something I've introduced, a known issue or a bug?
>
> >
>
> > ./bin/spark-shell --version
>
> > Welcome to
>
> >     __
>
> >  / __/__  ___ _/ /__
>
> > _\ \/ _ \/ _ `/ __/  '_/
>
> >/___/ .__/\_,_/_/ /_/\_\   version 2.2.0-SNAPSHOT
>
> >   /_/
>
> >
>
> > Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
>
> > Branch master
>
> > Compiled by user jacek on 2017-01-09T05:01:47Z
>
> > Revision 19d9d4c855eab8f647a5ec66b079172de81221d0
>
> > Url https://github.com/apache/spark.git
>
> > Type --help for more information.
>
> >
>
> > Pozdrawiam,
>
> > Jacek Laskowski
>
> > 
>
> > https://medium.com/@jaceklaskowski/
>
> > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>
> > Follow me at https://twitter.com/jaceklaskowski
>
> >
>
> > -
>
> > To unsubscribe e-mail:
>
>
>
> > dev-unsubscribe@.apache
>
>
>
>
>
>
>
>
>
>
>
> -
>
> Liang-Chi Hsieh | @viirya
>
> Spark Technology Center
>
> http://www.spark.tc/
>
> --
>
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/scala-MatchError-scala-collection-immutable-Range-Inclusive-from-catalyst-ScalaReflection-serializer-tp20520p20522.html
>
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
>
>
> -
>
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>
>


Re: scala.MatchError: scala.collection.immutable.Range.Inclusive from catalyst.ScalaReflection.serializerFor?

2017-01-09 Thread Liang-Chi Hsieh

Hi,

As Seq(0 to 8) is:

scala> Seq(0 to 8)
res1: Seq[scala.collection.immutable.Range.Inclusive] = List(Range(0, 1, 2,
3, 4, 5, 6, 7, 8))

Do you actually want to create a Dataset of Range? If so, I think currently
ScalaReflection which the encoder relies doesn't support Range.



Jacek Laskowski wrote
> Hi,
> 
> Just got this this morning using the fresh build of Spark
> 2.2.0-SNAPSHOT (with a few local modifications):
> 
> scala> Seq(0 to 8).toDF
> scala.MatchError: scala.collection.immutable.Range.Inclusive (of class
> scala.reflect.internal.Types$ClassNoArgsTypeRef)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:520)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:463)
>   at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
>   at
> org.apache.spark.sql.SQLImplicits.newIntSequenceEncoder(SQLImplicits.scala:168)
>   ... 48 elided
> 
> Is this something I've introduced, a known issue or a bug?
> 
> ./bin/spark-shell --version
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.2.0-SNAPSHOT
>   /_/
> 
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
> Branch master
> Compiled by user jacek on 2017-01-09T05:01:47Z
> Revision 19d9d4c855eab8f647a5ec66b079172de81221d0
> Url https://github.com/apache/spark.git
> Type --help for more information.
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> -
> To unsubscribe e-mail: 

> dev-unsubscribe@.apache





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/scala-MatchError-scala-collection-immutable-Range-Inclusive-from-catalyst-ScalaReflection-serializer-tp20520p20522.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: handling of empty partitions

2017-01-09 Thread Georg Heiler
Hi Liang-Chi Hsieh,

Strange:
As the "toCarry" returned is the following when I tested your codes:

Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
Some(FooBar(Some(2016-01-02),second)))
For me it always looked like:

## carry
Map(2 -> None, 5 -> None, 4 -> None, 7 ->
Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
## carry


I updated the code to contain a fixed default parallelism
.set("spark.default.parallelism", "12")

Also:
I updated the sample code:
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

To cope with "empty/ none" partitions I added

var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
  if (lastNotNullRow == None) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
  }


But that will result in

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|  null|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|2016-01-04|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

You see that noValidFormat should have been filled with 2016-01-02 to be
filled with last good known value (forward fill)
Cheers,
Georg

Liang-Chi Hsieh  schrieb am Mo., 9. Jan. 2017 um
09:08 Uhr:

>
> The map "toCarry" will return you (partitionIndex, None) for empty
> partition.
>
> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
> None.
> You of course should check if an Option has value or not before you access
> it.
>
> As the "toCarry" returned is the following when I tested your codes:
>
> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> Some(FooBar(Some(2016-01-02),second)))
>
> As you seen, there is no None, so the codes work without failure. But of
> course it depends how your data partitions.
>
> For empty partition, when you do mapPartitions, it just gives you an empty
> iterator as input. You can do what you need. You already return a None when
> you find an empty iterator in preparing "toCarry". So I was wondering what
> you want to ask in the previous reply.
>
>
>
> geoHeil wrote
> > Thanks a lot, Holden.
> >
> > @Liang-Chi Hsieh did you try to run
> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me
> > that is crashing in either line 51 or 58. Holden described the problem
> > pretty well. Ist it clear for you now?
> >
> > Cheers,
> > Georg
> >
> > Holden Karau [via Apache Spark Developers List] <
>
> > ml-node+s1001551n20516h45@.nabble
>
> >> schrieb am Mo., 9. Jan. 2017 um
> > 06:40 Uhr:
> >
> >> Hi Georg,
> >>
> >> Thanks for the question along with the code (as well as posting to stack
> >> overflow). In general if a question is well suited for stackoverflow its
> >> probably better suited to the user@ list instead of the dev@ list so
> I've
> >> cc'd the user@ list for you.
> >>
> >> As far as handling empty partitions when working mapPartitions (and
> >> similar), the general approach is to return an empty iterator of the
> >> correct type when you have an empty input iterator.
> >>
> >> It looks like your code is doing this, however it seems like you likely
> >> have a bug in your application logic (namely it assumes that if a
> >> partition
> >> has a record missing a value it will either have had a previous row in
> >> the
> >> same partition which is good OR that the previous partition is not empty
> >> and has a good row - which need not necessarily be the case). You've
> >> partially fixed this problem by going through and for each partition
> >> collecting the last previous good value, and then if you don't have a
> >> good
> >> value at the start of a partition look up the value in the collected
> >> array.
> >>
> >> However, if this also happens at the same time the previous partition is
> >> empty, you will need to go and lookup the previous previous partition
> >> value
> >> until you find the one you are looking for. (Note this assumes that the
> >> first record in your dataset is valid, if it isn't your code will still
> >> fail).
> >>
> >> Your solution is really close to working but just has some minor
> >> assumptions which don't always necessarily hold.
> >>
> >> Cheers,
> >>
> >> Holden :)
> >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
> >> http:///user/SendEmail.jtp?type=nodenode=20516i=0;>
> >> wrote:
> >>
> >>
> >> Hi Georg,
> >>
> >> Can you describe your 

scala.MatchError: scala.collection.immutable.Range.Inclusive from catalyst.ScalaReflection.serializerFor?

2017-01-09 Thread Jacek Laskowski
Hi,

Just got this this morning using the fresh build of Spark
2.2.0-SNAPSHOT (with a few local modifications):

scala> Seq(0 to 8).toDF
scala.MatchError: scala.collection.immutable.Range.Inclusive (of class
scala.reflect.internal.Types$ClassNoArgsTypeRef)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:520)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:463)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at 
org.apache.spark.sql.SQLImplicits.newIntSequenceEncoder(SQLImplicits.scala:168)
  ... 48 elided

Is this something I've introduced, a known issue or a bug?

./bin/spark-shell --version
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0-SNAPSHOT
  /_/

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
Branch master
Compiled by user jacek on 2017-01-09T05:01:47Z
Revision 19d9d4c855eab8f647a5ec66b079172de81221d0
Url https://github.com/apache/spark.git
Type --help for more information.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: handling of empty partitions

2017-01-09 Thread Liang-Chi Hsieh

The map "toCarry" will return you (partitionIndex, None) for empty
partition.

So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is None.
You of course should check if an Option has value or not before you access
it.

As the "toCarry" returned is the following when I tested your codes:

Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
Some(FooBar(Some(2016-01-02),second)))

As you seen, there is no None, so the codes work without failure. But of
course it depends how your data partitions.

For empty partition, when you do mapPartitions, it just gives you an empty
iterator as input. You can do what you need. You already return a None when
you find an empty iterator in preparing "toCarry". So I was wondering what
you want to ask in the previous reply.



geoHeil wrote
> Thanks a lot, Holden.
> 
> @Liang-Chi Hsieh did you try to run
> https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me
> that is crashing in either line 51 or 58. Holden described the problem
> pretty well. Ist it clear for you now?
> 
> Cheers,
> Georg
> 
> Holden Karau [via Apache Spark Developers List] <

> ml-node+s1001551n20516h45@.nabble

>> schrieb am Mo., 9. Jan. 2017 um
> 06:40 Uhr:
> 
>> Hi Georg,
>>
>> Thanks for the question along with the code (as well as posting to stack
>> overflow). In general if a question is well suited for stackoverflow its
>> probably better suited to the user@ list instead of the dev@ list so I've
>> cc'd the user@ list for you.
>>
>> As far as handling empty partitions when working mapPartitions (and
>> similar), the general approach is to return an empty iterator of the
>> correct type when you have an empty input iterator.
>>
>> It looks like your code is doing this, however it seems like you likely
>> have a bug in your application logic (namely it assumes that if a
>> partition
>> has a record missing a value it will either have had a previous row in
>> the
>> same partition which is good OR that the previous partition is not empty
>> and has a good row - which need not necessarily be the case). You've
>> partially fixed this problem by going through and for each partition
>> collecting the last previous good value, and then if you don't have a
>> good
>> value at the start of a partition look up the value in the collected
>> array.
>>
>> However, if this also happens at the same time the previous partition is
>> empty, you will need to go and lookup the previous previous partition
>> value
>> until you find the one you are looking for. (Note this assumes that the
>> first record in your dataset is valid, if it isn't your code will still
>> fail).
>>
>> Your solution is really close to working but just has some minor
>> assumptions which don't always necessarily hold.
>>
>> Cheers,
>>
>> Holden :)
>> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
>> http:///user/SendEmail.jtp?type=nodenode=20516i=0;>
>> wrote:
>>
>>
>> Hi Georg,
>>
>> Can you describe your question more clear?
>>
>> Actually, the example codes you posted in stackoverflow doesn't crash as
>> you
>> said in the post.
>>
>>
>> geoHeil wrote
>> > I am working on building a custom ML pipeline-model / estimator to
>> impute
>> > missing values, e.g. I want to fill with last good known value.
>> > Using a window function is slow / will put the data into a single
>> > partition.
>> > I built some sample code to use the RDD API however, it some None /
>> null
>> > problems with empty partitions.
>> >
>> > How should this be implemented properly to handle such empty
>> partitions?
>> >
>> http://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions
>> >
>> > Kind regards,
>> > Georg
>>
>>
>>
>>
>>
>> -
>>
>>
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515.html
>>
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>>
>> To unsubscribe e-mail: [hidden email]
>> http:///user/SendEmail.jtp?type=nodenode=20516i=1;
>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20516.html
>> To unsubscribe from handling of empty partitions, click here
>> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=20496code=Z2Vvcmcua2YuaGVpbGVyQGdtYWlsLmNvbXwyMDQ5NnwtMTgzMzc4NTU4MQ==;
>> .
>> NAML
>>