Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Denny Lee
Awesome - congrats Felix!

On Mon, Aug 8, 2016 at 9:44 PM Felix Cheung 
wrote:

> Thank you!
> Looking forward to work with you all!
>
>
>
>
>
> On Mon, Aug 8, 2016 at 7:41 PM -0700, "Yanbo Liang" 
> wrote:
>
> Congrats Felix!
>
> 2016-08-08 18:21 GMT-07:00 Kai Jiang :
>
>> Congrats Felix!
>>
>> On Mon, Aug 8, 2016, 18:14 Jeff Zhang  wrote:
>>
>>> Congrats Felix!
>>>
>>> On Tue, Aug 9, 2016 at 8:49 AM, Hyukjin Kwon 
>>> wrote:
>>>
 Congratulations!

 2016-08-09 7:47 GMT+09:00 Xiao Li :

> Congrats Felix!
>
> 2016-08-08 15:04 GMT-07:00 Herman van Hövell tot Westerflier
> :
> > Congrats Felix!
> >
> > On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar 
> wrote:
> >>
> >> Congrats Felix!
> >>
> >> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar 
> wrote:
> >>>
> >>> Congrats Felix!
> >>>
> >>> Tarun
> >>>
> >>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen 
> wrote:
> 
>  Congrats Felix!
> 
>  Tim
> 
>  On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia <
> matei.zaha...@gmail.com>
>  wrote:
>  > Hi all,
>  >
>  > The PMC recently voted to add Felix Cheung as a committer.
> Felix has
>  > been a major contributor to SparkR and we're excited to have
> him join
>  > officially. Congrats and welcome, Felix!
>  >
>  > Matei
>  >
> -
>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>  >
> 
> 
> -
>  To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 
> >>
> >>
> >>
> >> --
> >> -Dhruve Ashar
> >>
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Yanbo Liang
Congrats Felix!

2016-08-08 18:21 GMT-07:00 Kai Jiang :

> Congrats Felix!
>
> On Mon, Aug 8, 2016, 18:14 Jeff Zhang  wrote:
>
>> Congrats Felix!
>>
>> On Tue, Aug 9, 2016 at 8:49 AM, Hyukjin Kwon  wrote:
>>
>>> Congratulations!
>>>
>>> 2016-08-09 7:47 GMT+09:00 Xiao Li :
>>>
 Congrats Felix!

 2016-08-08 15:04 GMT-07:00 Herman van Hövell tot Westerflier
 :
 > Congrats Felix!
 >
 > On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar 
 wrote:
 >>
 >> Congrats Felix!
 >>
 >> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar 
 wrote:
 >>>
 >>> Congrats Felix!
 >>>
 >>> Tarun
 >>>
 >>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen 
 wrote:
 
  Congrats Felix!
 
  Tim
 
  On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia <
 matei.zaha...@gmail.com>
  wrote:
  > Hi all,
  >
  > The PMC recently voted to add Felix Cheung as a committer. Felix
 has
  > been a major contributor to SparkR and we're excited to have him
 join
  > officially. Congrats and welcome, Felix!
  >
  > Matei
  > 
 -
  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
  >
 
  
 -
  To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
 
 >>
 >>
 >>
 >> --
 >> -Dhruve Ashar
 >>
 >

 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Source API requires unbounded distributed storage?

2016-08-08 Thread Fred Reiss
Created SPARK-16963 to cover this issue.

Fred

On Thu, Aug 4, 2016 at 4:52 PM, Michael Armbrust 
wrote:

> Yeah, this API is in the private execution package because we are planning
> to continue to iterate on it.  Today, we will only ever go back one batch,
> though that might change in the future if we do async checkpointing of
> internal state.
>
> You are totally right that we should relay this info back to the source.
> Opening a JIRA sounds like a good first step.
>
> On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss  wrote:
>
>> Hi,
>>
>> I've been looking over the Source API in 
>> org.apache.spark.sql.execution.streaming,
>> and I'm at a loss for how the current API can be implemented in a practical
>> way. The API defines a single getBatch() method for fetching records from
>> the source, with the following Scaladoc comments defining the semantics:
>>
>>
>> */**  * Returns the data that is between the offsets (*`*start*`*, *`
>> *end*`*]. When *`*start*` *is *`*None*`
>>
>> *then  * the batch should begin with the first available record. This
>> method must always return the  * same data for a particular *`*start*` *and
>> *`*end*`
>> *pair.  */*
>> * def *getBatch(start: Option[Offset], end: Offset): DataFrame
>>
>> If I read the semantics described here correctly, a Source is required to
>> retain all past history for the stream that it backs. Further, a Source
>> is also required to retain this data across restarts of the process where
>> the Source is instantiated, even when the Source is restarted on a
>> different machine.
>>
>> The current implementation of FileStreamSource follows my reading of the
>> requirements above. FileStreamSource never deletes a file.
>>
>> I feel like this requirement for unbounded state retention must be a
>> mistake or misunderstanding of some kind. The scheduler is internally
>> maintaining a high water mark (StreamExecution.committedOffsets in
>> StreamExecution.scala) of data that has been successfully processed. There
>> must have been an intent to communicate that high water mark back to the
>> Source so that the Source can clean up its state. Indeed, the DataBricks
>> blog post from last week (https://databricks.com/blog/2
>> 016/07/28/structured-streaming-in-apache-spark.html) says that "Only a
>> few minutes’ worth of data needs to be retained; Structured Streaming will
>> maintain its own internal state after that."
>>
>> But the code checked into git and shipped with Spark 2.0 does not have an
>> API call for the scheduler to tell a Source where the boundary of "only a
>> few minutes' worth of data" lies.
>>
>> Is there a JIRA that I'm not aware of to change the Source API? If not,
>> should we maybe open one?
>>
>> Fred
>>
>
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Kai Jiang
Congrats Felix!

On Mon, Aug 8, 2016, 18:14 Jeff Zhang  wrote:

> Congrats Felix!
>
> On Tue, Aug 9, 2016 at 8:49 AM, Hyukjin Kwon  wrote:
>
>> Congratulations!
>>
>> 2016-08-09 7:47 GMT+09:00 Xiao Li :
>>
>>> Congrats Felix!
>>>
>>> 2016-08-08 15:04 GMT-07:00 Herman van Hövell tot Westerflier
>>> :
>>> > Congrats Felix!
>>> >
>>> > On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar 
>>> wrote:
>>> >>
>>> >> Congrats Felix!
>>> >>
>>> >> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar 
>>> wrote:
>>> >>>
>>> >>> Congrats Felix!
>>> >>>
>>> >>> Tarun
>>> >>>
>>> >>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen 
>>> wrote:
>>> 
>>>  Congrats Felix!
>>> 
>>>  Tim
>>> 
>>>  On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia <
>>> matei.zaha...@gmail.com>
>>>  wrote:
>>>  > Hi all,
>>>  >
>>>  > The PMC recently voted to add Felix Cheung as a committer. Felix
>>> has
>>>  > been a major contributor to SparkR and we're excited to have him
>>> join
>>>  > officially. Congrats and welcome, Felix!
>>>  >
>>>  > Matei
>>>  >
>>> -
>>>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>  >
>>> 
>>> 
>>> -
>>>  To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> 
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> -Dhruve Ashar
>>> >>
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Hyukjin Kwon
Congratulations!

2016-08-09 7:47 GMT+09:00 Xiao Li :

> Congrats Felix!
>
> 2016-08-08 15:04 GMT-07:00 Herman van Hövell tot Westerflier
> :
> > Congrats Felix!
> >
> > On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar 
> wrote:
> >>
> >> Congrats Felix!
> >>
> >> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar 
> wrote:
> >>>
> >>> Congrats Felix!
> >>>
> >>> Tarun
> >>>
> >>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen 
> wrote:
> 
>  Congrats Felix!
> 
>  Tim
> 
>  On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia <
> matei.zaha...@gmail.com>
>  wrote:
>  > Hi all,
>  >
>  > The PMC recently voted to add Felix Cheung as a committer. Felix has
>  > been a major contributor to SparkR and we're excited to have him
> join
>  > officially. Congrats and welcome, Felix!
>  >
>  > Matei
>  > 
> -
>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>  >
> 
>  -
>  To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 
> >>
> >>
> >>
> >> --
> >> -Dhruve Ashar
> >>
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: SASL Support

2016-08-08 Thread Reynold Xin
Please send a pull request to update the doc. Thanks.

On Tue, Aug 9, 2016 at 6:48 AM, Michael Gummelt 
wrote:

> I was checking if RPC calls can be encrypted and I saw here that the docs
> here (*http://spark.apache.org/docs/latest/configuration.html
> ) *say that SASL
> encryption is "currently only supported by the block transfer service."
>
> However, it seems that RPC can be SASL encrypted as well:
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala#L64
>
> Is this accurate?  If so, I'll submit a PR to update the docs.
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


SASL Support

2016-08-08 Thread Michael Gummelt
I was checking if RPC calls can be encrypted and I saw here that the docs
here (*http://spark.apache.org/docs/latest/configuration.html
) *say that SASL
encryption is "currently only supported by the block transfer service."

However, it seems that RPC can be SASL encrypted as well:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala#L64

Is this accurate?  If so, I'll submit a PR to update the docs.

-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Xiao Li
Congrats Felix!

2016-08-08 15:04 GMT-07:00 Herman van Hövell tot Westerflier
:
> Congrats Felix!
>
> On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar  wrote:
>>
>> Congrats Felix!
>>
>> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar  wrote:
>>>
>>> Congrats Felix!
>>>
>>> Tarun
>>>
>>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen  wrote:

 Congrats Felix!

 Tim

 On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
 wrote:
 > Hi all,
 >
 > The PMC recently voted to add Felix Cheung as a committer. Felix has
 > been a major contributor to SparkR and we're excited to have him join
 > officially. Congrats and welcome, Felix!
 >
 > Matei
 > -
 > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
 >

 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

>>
>>
>>
>> --
>> -Dhruve Ashar
>>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Herman van Hövell tot Westerflier
Congrats Felix!

On Mon, Aug 8, 2016 at 11:57 PM, dhruve ashar  wrote:

> Congrats Felix!
>
> On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar  wrote:
>
>> Congrats Felix!
>>
>> Tarun
>>
>> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen  wrote:
>>
>>> Congrats Felix!
>>>
>>> Tim
>>>
>>> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
>>> wrote:
>>> > Hi all,
>>> >
>>> > The PMC recently voted to add Felix Cheung as a committer. Felix has
>>> been a major contributor to SparkR and we're excited to have him join
>>> officially. Congrats and welcome, Felix!
>>> >
>>> > Matei
>>> > -
>>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>
>
> --
> -Dhruve Ashar
>
>


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
The show thing was the result of an optimization that short-circuited any
real Spark computation when the input is a local collection, and the result
was simply the first few rows. That's why it completed without serializing
anything.

It is somewhat inconsistent. One way to eliminate the inconsistency is to
always serialize the query plan even for local execution. We did that back
in the days for the RDD code path, and we can do similar things for the SQL
code path. However, serialization is not free and it will slow down the
execution by small percentage.



On Tue, Aug 9, 2016 at 5:05 AM, Hao Ren  wrote:

> @Reynold
>
> Some questions to make things clear:
>
> 1. As nothing is really final in the JVM, is the generated code during
> the execution of `df.show()` different from the one of `df.filter($"key"
> === 2).show()` in my snippet ?
>
> 2. When `df.show()` is being executed, it seems that the 'notSer' object
> is not serialized (since no exception), instead the Int value in it is
> serialized. Is this correct ?
> As for me, this behavior is counterintuitive.
> The analogical problem would be a `RDD.map` 's closure contains
> 'notSer.value'. For example:
> 
> rdd.map {
>   case (key, value) => value + notSer.value
> }
> rdd.count
> 
> It should thrown a "Task not serializable" exception. But for dataframe,
> it is not the case because of reflection or unsafe.
>
> 3. I am wondering whether this "feature" of scala complier makes the
> DataFrame API unpredictable ? Any roadmap on this ?
> As a user, I can not expect that a `fitler` call before `show` crashes,
> while a simple `show` on the original df works.
>
> The workaround I can imagine is just to cache and materialize `df` by
> `df.cache.count()`, and then call `df.filter(...).show()`.
> It should work, just a little bit tedious.
>
>
>
> On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin  wrote:
>
>> That is unfortunately the way how Scala compiler captures (and defines)
>> closures. Nothing is really final in the JVM. You can always use reflection
>> or unsafe to modify the value of fields.
>>
>> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <
>> simon.sc...@viavisolutions.com> wrote:
>>
>>> But does the “notSer” object have to be serialized?
>>>
>>>
>>>
>>> The object is immutable by the definition of A, so the only thing that
>>> needs to be serialized is the (immutable) Int value? And Ints are
>>> serializable?
>>>
>>>
>>>
>>> Just thinking out loud
>>>
>>>
>>>
>>> Simon Scott
>>>
>>>
>>>
>>> Research Developer @ viavisolutions.com
>>>
>>>
>>>
>>> *From:* Hao Ren [mailto:inv...@gmail.com]
>>> *Sent:* 08 August 2016 09:03
>>> *To:* Muthu Jayakumar 
>>> *Cc:* user ; dev 
>>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>>> does not work as expected
>>>
>>>
>>>
>>> Yes, it is.
>>>
>>> You can define a udf like that.
>>>
>>> Basically, it's a udf Int => Int which is a closure contains a non
>>> serializable object.
>>>
>>> The latter should cause Task not serializable exception.
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
>>> wrote:
>>>
>>> Hello Hao Ren,
>>>
>>>
>>>
>>> Doesn't the code...
>>>
>>>
>>>
>>> val add = udf {
>>>
>>>   (a: Int) => a + notSer.value
>>>
>>> }
>>>
>>> Mean UDF function that Int => Int ?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Muthu
>>>
>>>
>>>
>>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>>>
>>> I am playing with spark 2.0
>>>
>>> What I tried to test is:
>>>
>>>
>>>
>>> Create a UDF in which there is a non serializable object.
>>>
>>> What I expected is when this UDF is called during materializing the
>>> dataFrame where the UDF is used in "select", an task non serializable
>>> exception should be thrown.
>>>
>>> It depends also which "action" is called on that dataframe.
>>>
>>>
>>>
>>> Here is the code for reproducing the pb:
>>>
>>>
>>>
>>> 
>>>
>>> object DataFrameSerDeTest extends App {
>>>
>>>
>>>
>>>   class A(val value: Int) // It is not serializable
>>>
>>>
>>>
>>>   def run() = {
>>>
>>> val spark = SparkSession
>>>
>>>   .builder()
>>>
>>>   .appName("DataFrameSerDeTest")
>>>
>>>   .master("local[*]")
>>>
>>>   .getOrCreate()
>>>
>>>
>>>
>>> import org.apache.spark.sql.functions.udf
>>>
>>> import spark.sqlContext.implicits._
>>>
>>>
>>>
>>> val notSer = new A(2)
>>>
>>> val add = udf {
>>>
>>>   (a: Int) => a + notSer.value
>>>
>>> }
>>>
>>> val df = spark.createDataFrame(Seq(
>>>
>>>   (1, 2),
>>>
>>>   (2, 2),
>>>
>>>   (3, 2),
>>>
>>>   (4, 2)
>>>
>>> )).toDF("key", "value")
>>>
>>>   .select($"key", add($"value").as("added"))
>>>
>>>
>>>
>>> df.show() // *It should not work because the udf contains a
>>> non-serializable object, but it works*
>>>
>>>
>>>
>>> df.filter($"key" === 2).show() // *It 

Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread dhruve ashar
Congrats Felix!

On Mon, Aug 8, 2016 at 2:28 PM, Tarun Kumar  wrote:

> Congrats Felix!
>
> Tarun
>
> On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen  wrote:
>
>> Congrats Felix!
>>
>> Tim
>>
>> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
>> wrote:
>> > Hi all,
>> >
>> > The PMC recently voted to add Felix Cheung as a committer. Felix has
>> been a major contributor to SparkR and we're excited to have him join
>> officially. Congrats and welcome, Felix!
>> >
>> > Matei
>> > -
>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


-- 
-Dhruve Ashar


Re: Scaling partitioned Hive table support

2016-08-08 Thread Michael Allman
Hi Eric,

Thanks for your feedback. I'm rebasing my code for the first approach on a more 
recent Spark master and am resolving some conflicts. I'll have a better 
understanding of the relationship to your PR once my rebase is complete.

Cheers,

Michael

> On Aug 8, 2016, at 12:51 PM, Eric Liang  wrote:
> 
> I like the former approach -- it seems more generally applicable to other 
> catalogs and IIUC would let you defer pruning until execution time. Pruning 
> is work that should be done by the catalog anyways, as is the case when 
> querying over an (unconverted) hive table.
> 
> You might also want to look at https://github.com/apache/spark/pull/14241 
>  , which refactors some of the 
> file scan execution to defer pruning.
> 
> 
> On Mon, Aug 8, 2016, 11:53 AM Michael Allman  > wrote:
> Hello,
> 
> I'd like to propose a modification in the way Hive table partition metadata 
> are loaded and cached. Currently, when a user reads from a partitioned Hive 
> table whose metadata are not cached (and for which Hive table conversion is 
> enabled and supported), all partition metadata is fetched from the metastore:
> 
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>  
> 
> 
> This is highly inefficient in some scenarios. In the most extreme case, a 
> user starts a new Spark app, runs a query which reads from a single partition 
> in a table with a large number of partitions and terminates their app. All 
> partition metadata are loaded and their files' schema are merged, but only a 
> single partition is read. Instead, I propose we load and cache partition 
> metadata on-demand, as needed to build query plans.
> 
> We've long encountered this performance problem at VideoAmp and have taken 
> different approaches to address it. In addition to the load time, we've found 
> that loading all of a table's partition metadata can require a significant 
> amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
> allocate several GB of heap space.
> 
> Certainly one could argue that our situation is pathological and rare, and 
> that the problem in our scenario is with the design of our tables—not Spark. 
> However, even in tables with more modest numbers of partitions, loading only 
> the necessary partition metadata and file schema can significantly reduce the 
> query planning time, and is definitely more memory efficient.
> 
> I've written POCs for a couple of different implementation approaches. Though 
> incomplete, both have been successful in their basic goal. The first extends 
> `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
> general. It requires some new abstractions and refactoring of 
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater 
> burden on other implementations of `ExternalCatalog`. Currently the only 
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code 
> throws an `UnsupportedOperationException` on that implementation.
> 
> The other approach is simpler and only touches code in the codebase's `hive` 
> project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` 
> is deferred to physical planning when the metastore relation is partitioned. 
> During physical planning, the partition pruning filters in a logical query 
> plan are used to select the required partition metadata and a 
> `HadoopFsRelation` is built from those. The new logical plan is then 
> re-injected into the planner.
> 
> I'd like to get the community's thoughts on my proposal and implementation 
> approaches.
> 
> Thanks!
> 
> Michael



Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
That is unfortunately the way how Scala compiler captures (and defines)
closures. Nothing is really final in the JVM. You can always use reflection
or unsafe to modify the value of fields.

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott 
wrote:

> But does the “notSer” object have to be serialized?
>
>
>
> The object is immutable by the definition of A, so the only thing that
> needs to be serialized is the (immutable) Int value? And Ints are
> serializable?
>
>
>
> Just thinking out loud
>
>
>
> Simon Scott
>
>
>
> Research Developer @ viavisolutions.com
>
>
>
> *From:* Hao Ren [mailto:inv...@gmail.com]
> *Sent:* 08 August 2016 09:03
> *To:* Muthu Jayakumar 
> *Cc:* user ; dev 
> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
> does not work as expected
>
>
>
> Yes, it is.
>
> You can define a udf like that.
>
> Basically, it's a udf Int => Int which is a closure contains a non
> serializable object.
>
> The latter should cause Task not serializable exception.
>
>
>
> Hao
>
>
>
> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
> wrote:
>
> Hello Hao Ren,
>
>
>
> Doesn't the code...
>
>
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> Mean UDF function that Int => Int ?
>
>
>
> Thanks,
>
> Muthu
>
>
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>
> I am playing with spark 2.0
>
> What I tried to test is:
>
>
>
> Create a UDF in which there is a non serializable object.
>
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
>
> It depends also which "action" is called on that dataframe.
>
>
>
> Here is the code for reproducing the pb:
>
>
>
> 
>
> object DataFrameSerDeTest extends App {
>
>
>
>   class A(val value: Int) // It is not serializable
>
>
>
>   def run() = {
>
> val spark = SparkSession
>
>   .builder()
>
>   .appName("DataFrameSerDeTest")
>
>   .master("local[*]")
>
>   .getOrCreate()
>
>
>
> import org.apache.spark.sql.functions.udf
>
> import spark.sqlContext.implicits._
>
>
>
> val notSer = new A(2)
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> val df = spark.createDataFrame(Seq(
>
>   (1, 2),
>
>   (2, 2),
>
>   (3, 2),
>
>   (4, 2)
>
> )).toDF("key", "value")
>
>   .select($"key", add($"value").as("added"))
>
>
>
> df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
>
>
> df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
>
>   }
>
>
>
>   run()
>
> }
>
> 
>
>
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
>
> It seems only filter() throws the exception. (feature or bug ?)
>
>
>
> Any ideas ? Or I just messed things up ?
>
> Any help is highly appreciated.
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>
>
>
>
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>


Re: Scaling partitioned Hive table support

2016-08-08 Thread Eric Liang
I like the former approach -- it seems more generally applicable to other
catalogs and IIUC would let you defer pruning until execution time. Pruning
is work that should be done by the catalog anyways, as is the case when
querying over an (unconverted) hive table.

You might also want to look at https://github.com/apache/spark/pull/14241 ,
which refactors some of the file scan execution to defer pruning.

On Mon, Aug 8, 2016, 11:53 AM Michael Allman  wrote:

> Hello,
>
> I'd like to propose a modification in the way Hive table partition
> metadata are loaded and cached. Currently, when a user reads from a
> partitioned Hive table whose metadata are not cached (and for which Hive
> table conversion is enabled and supported), all partition metadata is
> fetched from the metastore:
>
>
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>
> This is highly inefficient in some scenarios. In the most extreme case, a
> user starts a new Spark app, runs a query which reads from a single
> partition in a table with a large number of partitions and terminates their
> app. All partition metadata are loaded and their files' schema are merged,
> but only a single partition is read. Instead, I propose we load and cache
> partition metadata on-demand, as needed to build query plans.
>
> We've long encountered this performance problem at VideoAmp and have taken
> different approaches to address it. In addition to the load time, we've
> found that loading all of a table's partition metadata can require a
> significant amount of JVM heap space. Our largest tables OOM our Spark
> drivers unless we allocate several GB of heap space.
>
> Certainly one could argue that our situation is pathological and rare, and
> that the problem in our scenario is with the design of our tables—not
> Spark. However, even in tables with more modest numbers of partitions,
> loading only the necessary partition metadata and file schema can
> significantly reduce the query planning time, and is definitely more memory
> efficient.
>
> I've written POCs for a couple of different implementation approaches.
> Though incomplete, both have been successful in their basic goal. The first
> extends `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such
> is more general. It requires some new abstractions and refactoring of
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater
> burden on other implementations of `ExternalCatalog`. Currently the only
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code
> throws an `UnsupportedOperationException` on that implementation.
>
> The other approach is simpler and only touches code in the codebase's
> `hive` project. Basically, conversion of `MetastoreRelation` to
> `HadoopFsRelation` is deferred to physical planning when the metastore
> relation is partitioned. During physical planning, the partition pruning
> filters in a logical query plan are used to select the required partition
> metadata and a `HadoopFsRelation` is built from those. The new logical plan
> is then re-injected into the planner.
>
> I'd like to get the community's thoughts on my proposal and implementation
> approaches.
>
> Thanks!
>
> Michael
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Tarun Kumar
Congrats Felix!

Tarun
On Tue, Aug 9, 2016 at 12:57 AM, Timothy Chen  wrote:

> Congrats Felix!
>
> Tim
>
> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
> wrote:
> > Hi all,
> >
> > The PMC recently voted to add Felix Cheung as a committer. Felix has
> been a major contributor to SparkR and we're excited to have him join
> officially. Congrats and welcome, Felix!
> >
> > Matei
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Timothy Chen
Congrats Felix!

Tim

On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia  wrote:
> Hi all,
>
> The PMC recently voted to add Felix Cheung as a committer. Felix has been a 
> major contributor to SparkR and we're excited to have him join officially. 
> Congrats and welcome, Felix!
>
> Matei
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Suresh Thalamati
Congratulations , Felix!



> On Aug 8, 2016, at 11:15 AM, Ted Yu  wrote:
> 
> Congratulations, Felix.
> 
> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia  > wrote:
> Hi all,
> 
> The PMC recently voted to add Felix Cheung as a committer. Felix has been a 
> major contributor to SparkR and we're excited to have him join officially. 
> Congrats and welcome, Felix!
> 
> Matei
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> 
> 
> 



Scaling partitioned Hive table support

2016-08-08 Thread Michael Allman
Hello,

I'd like to propose a modification in the way Hive table partition metadata are 
loaded and cached. Currently, when a user reads from a partitioned Hive table 
whose metadata are not cached (and for which Hive table conversion is enabled 
and supported), all partition metadata is fetched from the metastore:

https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
 


This is highly inefficient in some scenarios. In the most extreme case, a user 
starts a new Spark app, runs a query which reads from a single partition in a 
table with a large number of partitions and terminates their app. All partition 
metadata are loaded and their files' schema are merged, but only a single 
partition is read. Instead, I propose we load and cache partition metadata 
on-demand, as needed to build query plans.

We've long encountered this performance problem at VideoAmp and have taken 
different approaches to address it. In addition to the load time, we've found 
that loading all of a table's partition metadata can require a significant 
amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
allocate several GB of heap space.

Certainly one could argue that our situation is pathological and rare, and that 
the problem in our scenario is with the design of our tables—not Spark. 
However, even in tables with more modest numbers of partitions, loading only 
the necessary partition metadata and file schema can significantly reduce the 
query planning time, and is definitely more memory efficient.

I've written POCs for a couple of different implementation approaches. Though 
incomplete, both have been successful in their basic goal. The first extends 
`org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
general. It requires some new abstractions and refactoring of 
`HadoopFsRelation` and `FileCatalog`, among others. It places a greater burden 
on other implementations of `ExternalCatalog`. Currently the only other 
implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code throws an 
`UnsupportedOperationException` on that implementation.

The other approach is simpler and only touches code in the codebase's `hive` 
project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` is 
deferred to physical planning when the metastore relation is partitioned. 
During physical planning, the partition pruning filters in a logical query plan 
are used to select the required partition metadata and a `HadoopFsRelation` is 
built from those. The new logical plan is then re-injected into the planner.

I'd like to get the community's thoughts on my proposal and implementation 
approaches.

Thanks!

Michael

Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Nicholas Chammas


Do we now have 2 SparkR-focused committers (Shivaram + Felix)? Or are there
more?

Nick

On Mon, Aug 8, 2016 at 2:17 PM Dongjoon Hyun  wrote:

> Congratulation, Felix!
>
> Bests,
> Dongjoon.
>
>
> On Monday, August 8, 2016, Ted Yu  wrote:
>
>> Congratulations, Felix.
>>
>> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
>> wrote:
>>
>>> Hi all,
>>>
>>> The PMC recently voted to add Felix Cheung as a committer. Felix has
>>> been a major contributor to SparkR and we're excited to have him join
>>> officially. Congrats and welcome, Felix!
>>>
>>> Matei
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Dongjoon Hyun
Congratulation, Felix!

Bests,
Dongjoon.

On Monday, August 8, 2016, Ted Yu  wrote:

> Congratulations, Felix.
>
> On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia  > wrote:
>
>> Hi all,
>>
>> The PMC recently voted to add Felix Cheung as a committer. Felix has been
>> a major contributor to SparkR and we're excited to have him join
>> officially. Congrats and welcome, Felix!
>>
>> Matei
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> 
>>
>>
>


Re: Welcoming Felix Cheung as a committer

2016-08-08 Thread Ted Yu
Congratulations, Felix.

On Mon, Aug 8, 2016 at 11:15 AM, Matei Zaharia 
wrote:

> Hi all,
>
> The PMC recently voted to add Felix Cheung as a committer. Felix has been
> a major contributor to SparkR and we're excited to have him join
> officially. Congrats and welcome, Felix!
>
> Matei
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Welcoming Felix Cheung as a committer

2016-08-08 Thread Matei Zaharia
Hi all,

The PMC recently voted to add Felix Cheung as a committer. Felix has been a 
major contributor to SparkR and we're excited to have him join officially. 
Congrats and welcome, Felix!

Matei
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Kafka Support new topic subscriptions without requiring restart of the streaming context

2016-08-08 Thread Cody Koeninger
The Kafka 0.10 support in spark 2.0 allows for pattern based topic
subscription
On Aug 8, 2016 1:12 AM, "r7raul1...@163.com"  wrote:

> How to add new topic to kafka without requiring restart of the streaming
> context?
>
> --
> r7raul1...@163.com
>


Spark 2.0 sql module empty columns in result over parquet tables

2016-08-08 Thread ekass
I run into this very strange issue. After loading parquet tables and trying
to run an sql query with the sql module the results are not correct with
Spark 2.0 although over the same exactly dataset Spark 1.6 results are
correct. With Textfiles however both versions of Spark work as expected. I
haven't noticed something strange in the command log stages, tasks and
shuffle data are almost the same. How can I debug the query execution to
find out why some columns in the result are null or why the resultset is
empty? Could this be related to datatype casting that is required in Spark
2.0 queries?

Thank you in advance



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-2-0-sql-module-empty-columns-in-result-over-parquet-tables-tp18579.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non
serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar  wrote:

> Hello Hao Ren,
>
> Doesn't the code...
>
> val add = udf {
>   (a: Int) => a + notSer.value
> }
> Mean UDF function that Int => Int ?
>
> Thanks,
> Muthu
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>
>> I am playing with spark 2.0
>> What I tried to test is:
>>
>> Create a UDF in which there is a non serializable object.
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>> It depends also which "action" is called on that dataframe.
>>
>> Here is the code for reproducing the pb:
>>
>> 
>> object DataFrameSerDeTest extends App {
>>
>>   class A(val value: Int) // It is not serializable
>>
>>   def run() = {
>> val spark = SparkSession
>>   .builder()
>>   .appName("DataFrameSerDeTest")
>>   .master("local[*]")
>>   .getOrCreate()
>>
>> import org.apache.spark.sql.functions.udf
>> import spark.sqlContext.implicits._
>>
>> val notSer = new A(2)
>> val add = udf {
>>   (a: Int) => a + notSer.value
>> }
>> val df = spark.createDataFrame(Seq(
>>   (1, 2),
>>   (2, 2),
>>   (3, 2),
>>   (4, 2)
>> )).toDF("key", "value")
>>   .select($"key", add($"value").as("added"))
>>
>> df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>> df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>   }
>>
>>   run()
>> }
>> 
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>> Any ideas ? Or I just messed things up ?
>> Any help is highly appreciated.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Kafka Support new topic subscriptions without requiring restart of the streaming context

2016-08-08 Thread r7raul1...@163.com
How to add new topic to kafka without requiring restart of the streaming 
context?



r7raul1...@163.com