Re: Aggregating over sorted data

2016-12-19 Thread Koert Kuipers
take a look at:
https://issues.apache.org/jira/browse/SPARK-15798


On Dec 19, 2016 00:17, "Robin East"  wrote:

This is also a feature we need for our time-series processing



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh  wrote:
>
>
> Hi,
>
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
>
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how
frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
>
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which
has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
>
> If you really need this feature, you may open a Jira to ask others'
opinion
> about this feature.
>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp1p20273.html
> Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Mehdi Meziane
We will be interested by the results if you give a try to Dynamic allocation 
with mesos ! 



- Mail Original - 
De: "Michael Gummelt"  
À: "Sumit Chawla"  
Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User" 
, dev@spark.apache.org 
Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin / Berne / 
Rome / Stockholm / Vienne 
Objet: Re: Mesos Spark Fine Grained Execution - CPU count 



> Is this problem of idle executors sticking around solved in Dynamic Resource 
> Allocation? Is there some timeout after which Idle executors can just 
> shutdown and cleanup its resources. 

Yes, that's exactly what dynamic allocation does. But again I have no idea what 
the state of dynamic allocation + mesos is. 



On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 



Great. Makes much better sense now. What will be reason to have 
spark.mesos.mesosExecutor. cores more than 1, as this number doesn't include 
the number of cores for tasks. 


So in my case it seems like 30 CPUs are allocated to executors. And there are 
48 tasks so 48 + 30 = 78 CPUs. And i am noticing this gap of 30 is maintained 
till the last task exits. This explains the gap. Thanks everyone. I am still 
not sure how this number 30 is calculated. ( Is it dynamic based on current 
resources, or is it some configuration. I have 32 nodes in my cluster). 


Is this problem of idle executors sticking around solved in Dynamic Resource 
Allocation? Is there some timeout after which Idle executors can just shutdown 
and cleanup its resources. 





Regards 
Sumit Chawla 





On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt < mgumm...@mesosphere.io > 
wrote: 





> I should preassume that No of executors should be less than number of tasks. 

No. Each executor runs 0 or more tasks. 

Each executor consumes 1 CPU, and each task running on that executor consumes 
another CPU. You can customize this via spark.mesos.mesosExecutor.cores ( 
https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md ) and 
spark.task.cpus ( 
https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md ) 





On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit < sumitkcha...@gmail.com > 
wrote: 



Ah thanks. looks like i skipped reading this " Neither will executors terminate 
when they’re idle." 


So in my job scenario, I should preassume that No of executors should be less 
than number of tasks. Ideally one executor should execute 1 or more tasks. But 
i am observing something strange instead. I start my job with 48 partitions for 
a spark job. In mesos ui i see that number of tasks is 48, but no. of CPUs is 
78 which is way more than 48. Here i am assuming that 1 CPU is 1 executor. I am 
not specifying any configuration to set number of cores per executor. 



Regards 
Sumit Chawla 





On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere < jo...@mesosphere.io > 
wrote: 



That makes sense. From the documentation it looks like the executors are not 
supposed to terminate: 
http://spark.apache.org/docs/latest/running-on-mesos.html#fine-grained-deprecated
 


Note that while Spark tasks in fine-grained will relinquish cores as they 
terminate, they will not relinquish memory, as the JVM does not give memory 
back to the Operating System. Neither will executors terminate when they’re 
idle. 


I suppose your task to executor CPU ratio is low enough that it looks like most 
of the resources are not being reclaimed. If your tasks were using 
significantly more CPU the amortized cost of the idle executors would not be 
such a big deal. 






— 
Joris Van Remoortere 
Mesosphere 

On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen < tnac...@gmail.com > wrote: 


Hi Chawla, 

One possible reason is that Mesos fine grain mode also takes up cores 
to run the executor per host, so if you have 20 agents running Fine 
grained executor it will take up 20 cores while it's still running. 

Tim 

On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 


> Hi 
> 
> I am using Spark 1.6. I have one query about Fine Grained model in Spark. 
> I have a simple Spark application which transforms A -> B. Its a single 
> stage application. To begin the program, It starts with 48 partitions. 
> When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs 
> allocated to job. Now as the tasks get done, the number of active tasks 
> number starts decreasing. How ever, the number of CPUs does not decrease 
> propotionally. When the job was about to finish, there was a single 
> remaininig task, however CPU count was still 20. 
> 
> My questions, is why there is no one to one mapping between tasks and cpus 
> in Fine grained? How can these CPUs be released when the job is done, so 
> that other jobs can start. 
> 
> 
> Regards 
> Sumit Chawla 





-- 







Michael Gummelt 
Software Engineer 
Mesosphere 




-- 







Michael 

Re: [VOTE] Apache Spark 2.1.0 (RC5)

2016-12-19 Thread Reynold Xin
The vote passed with the following +1 and -1:


+1

Reynold Xin*
Sean Owen*
Dongjoon Hyun
Xiao Li
Herman van Hövell tot Westerflier
Joseph Bradley*
Liwei Lin
Denny Lee
Holden Karau
Adam Roberts
vaquar khan


0/+1 (not sure what this means but putting it here just in case)
Felix Cheung

-1
Franklyn D'souza (due to a bug that's not a regression)


I will work on packaging the release.
















On Thu, Dec 15, 2016 at 9:16 PM, Reynold Xin  wrote:

> Please vote on releasing the following candidate as Apache Spark version
> 2.1.0. The vote is open until Sun, December 18, 2016 at 21:30 PT and passes
> if a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 2.1.0
> [ ] -1 Do not release this package because ...
>
>
> To learn more about Apache Spark, please see http://spark.apache.org/
>
> The tag to be voted on is v2.1.0-rc5 (cd0a08361e2526519e7c131c42116b
> f56fa62c76)
>
> List of JIRA tickets resolved are:  https://issues.apache.org/
> jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>
> The release files, including signatures, digests, etc. can be found at:
> http://home.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-bin/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1223/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-docs/
>
>
> *FAQ*
>
> *How can I help test this release?*
>
> If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> *What should happen to JIRA tickets still targeting 2.1.0?*
>
> Committers should look at those and triage. Extremely important bug fixes,
> documentation, and API tweaks that impact compatibility should be worked on
> immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>
> *What happened to RC3/RC5?*
>
> They had issues withe release packaging and as a result were skipped.
>
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Chawla,Sumit
Tim,

We will try to run the application in coarse grain mode, and share the
findings with you.

Regards
Sumit Chawla


On Mon, Dec 19, 2016 at 3:11 PM, Timothy Chen  wrote:

> Dynamic allocation works with Coarse grain mode only, we wasn't aware
> a need for Fine grain mode after we enabled dynamic allocation support
> on the coarse grain mode.
>
> What's the reason you're running fine grain mode instead of coarse
> grain + dynamic allocation?
>
> Tim
>
> On Mon, Dec 19, 2016 at 2:45 PM, Mehdi Meziane
>  wrote:
> > We will be interested by the results if you give a try to Dynamic
> allocation
> > with mesos !
> >
> >
> > - Mail Original -
> > De: "Michael Gummelt" 
> > À: "Sumit Chawla" 
> > Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User"
> > , dev@spark.apache.org
> > Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin /
> > Berne / Rome / Stockholm / Vienne
> > Objet: Re: Mesos Spark Fine Grained Execution - CPU count
> >
> >
> >> Is this problem of idle executors sticking around solved in Dynamic
> >> Resource Allocation?  Is there some timeout after which Idle executors
> can
> >> just shutdown and cleanup its resources.
> >
> > Yes, that's exactly what dynamic allocation does.  But again I have no
> idea
> > what the state of dynamic allocation + mesos is.
> >
> > On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit 
> > wrote:
> >>
> >> Great.  Makes much better sense now.  What will be reason to have
> >> spark.mesos.mesosExecutor.cores more than 1, as this number doesn't
> include
> >> the number of cores for tasks.
> >>
> >> So in my case it seems like 30 CPUs are allocated to executors.  And
> there
> >> are 48 tasks so 48 + 30 =  78 CPUs.  And i am noticing this gap of 30 is
> >> maintained till the last task exits.  This explains the gap.   Thanks
> >> everyone.  I am still not sure how this number 30 is calculated.  ( Is
> it
> >> dynamic based on current resources, or is it some configuration.  I
> have 32
> >> nodes in my cluster).
> >>
> >> Is this problem of idle executors sticking around solved in Dynamic
> >> Resource Allocation?  Is there some timeout after which Idle executors
> can
> >> just shutdown and cleanup its resources.
> >>
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
> >> On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt <
> mgumm...@mesosphere.io>
> >> wrote:
> >>>
> >>> >  I should preassume that No of executors should be less than number
> of
> >>> > tasks.
> >>>
> >>> No.  Each executor runs 0 or more tasks.
> >>>
> >>> Each executor consumes 1 CPU, and each task running on that executor
> >>> consumes another CPU.  You can customize this via
> >>> spark.mesos.mesosExecutor.cores
> >>> (https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md)
> and
> >>> spark.task.cpus
> >>> (https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md)
> >>>
> >>> On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit  >
> >>> wrote:
> 
>  Ah thanks. looks like i skipped reading this "Neither will executors
>  terminate when they’re idle."
> 
>  So in my job scenario,  I should preassume that No of executors should
>  be less than number of tasks. Ideally one executor should execute 1
> or more
>  tasks.  But i am observing something strange instead.  I start my job
> with
>  48 partitions for a spark job. In mesos ui i see that number of tasks
> is 48,
>  but no. of CPUs is 78 which is way more than 48.  Here i am assuming
> that 1
>  CPU is 1 executor.   I am not specifying any configuration to set
> number of
>  cores per executor.
> 
>  Regards
>  Sumit Chawla
> 
> 
>  On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere
>   wrote:
> >
> > That makes sense. From the documentation it looks like the executors
> > are not supposed to terminate:
> >
> > http://spark.apache.org/docs/latest/running-on-mesos.html#
> fine-grained-deprecated
> >>
> >> Note that while Spark tasks in fine-grained will relinquish cores as
> >> they terminate, they will not relinquish memory, as the JVM does
> not give
> >> memory back to the Operating System. Neither will executors
> terminate when
> >> they’re idle.
> >
> >
> > I suppose your task to executor CPU ratio is low enough that it looks
> > like most of the resources are not being reclaimed. If your tasks
> were using
> > significantly more CPU the amortized cost of the idle executors
> would not be
> > such a big deal.
> >
> >
> > —
> > Joris Van Remoortere
> > Mesosphere
> >
> > On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen 
> > wrote:
> >>
> >> Hi Chawla,
> >>
> >> One possible reason is that Mesos fine grain mode also 

Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Timothy Chen
Dynamic allocation works with Coarse grain mode only, we wasn't aware
a need for Fine grain mode after we enabled dynamic allocation support
on the coarse grain mode.

What's the reason you're running fine grain mode instead of coarse
grain + dynamic allocation?

Tim

On Mon, Dec 19, 2016 at 2:45 PM, Mehdi Meziane
 wrote:
> We will be interested by the results if you give a try to Dynamic allocation
> with mesos !
>
>
> - Mail Original -
> De: "Michael Gummelt" 
> À: "Sumit Chawla" 
> Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User"
> , dev@spark.apache.org
> Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin /
> Berne / Rome / Stockholm / Vienne
> Objet: Re: Mesos Spark Fine Grained Execution - CPU count
>
>
>> Is this problem of idle executors sticking around solved in Dynamic
>> Resource Allocation?  Is there some timeout after which Idle executors can
>> just shutdown and cleanup its resources.
>
> Yes, that's exactly what dynamic allocation does.  But again I have no idea
> what the state of dynamic allocation + mesos is.
>
> On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit 
> wrote:
>>
>> Great.  Makes much better sense now.  What will be reason to have
>> spark.mesos.mesosExecutor.cores more than 1, as this number doesn't include
>> the number of cores for tasks.
>>
>> So in my case it seems like 30 CPUs are allocated to executors.  And there
>> are 48 tasks so 48 + 30 =  78 CPUs.  And i am noticing this gap of 30 is
>> maintained till the last task exits.  This explains the gap.   Thanks
>> everyone.  I am still not sure how this number 30 is calculated.  ( Is it
>> dynamic based on current resources, or is it some configuration.  I have 32
>> nodes in my cluster).
>>
>> Is this problem of idle executors sticking around solved in Dynamic
>> Resource Allocation?  Is there some timeout after which Idle executors can
>> just shutdown and cleanup its resources.
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt 
>> wrote:
>>>
>>> >  I should preassume that No of executors should be less than number of
>>> > tasks.
>>>
>>> No.  Each executor runs 0 or more tasks.
>>>
>>> Each executor consumes 1 CPU, and each task running on that executor
>>> consumes another CPU.  You can customize this via
>>> spark.mesos.mesosExecutor.cores
>>> (https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md) and
>>> spark.task.cpus
>>> (https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md)
>>>
>>> On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit 
>>> wrote:

 Ah thanks. looks like i skipped reading this "Neither will executors
 terminate when they’re idle."

 So in my job scenario,  I should preassume that No of executors should
 be less than number of tasks. Ideally one executor should execute 1 or more
 tasks.  But i am observing something strange instead.  I start my job with
 48 partitions for a spark job. In mesos ui i see that number of tasks is 
 48,
 but no. of CPUs is 78 which is way more than 48.  Here i am assuming that 1
 CPU is 1 executor.   I am not specifying any configuration to set number of
 cores per executor.

 Regards
 Sumit Chawla


 On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere
  wrote:
>
> That makes sense. From the documentation it looks like the executors
> are not supposed to terminate:
>
> http://spark.apache.org/docs/latest/running-on-mesos.html#fine-grained-deprecated
>>
>> Note that while Spark tasks in fine-grained will relinquish cores as
>> they terminate, they will not relinquish memory, as the JVM does not give
>> memory back to the Operating System. Neither will executors terminate 
>> when
>> they’re idle.
>
>
> I suppose your task to executor CPU ratio is low enough that it looks
> like most of the resources are not being reclaimed. If your tasks were 
> using
> significantly more CPU the amortized cost of the idle executors would not 
> be
> such a big deal.
>
>
> —
> Joris Van Remoortere
> Mesosphere
>
> On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen 
> wrote:
>>
>> Hi Chawla,
>>
>> One possible reason is that Mesos fine grain mode also takes up cores
>> to run the executor per host, so if you have 20 agents running Fine
>> grained executor it will take up 20 cores while it's still running.
>>
>> Tim
>>
>> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
>> wrote:
>> > Hi
>> >
>> > I am using Spark 1.6. I have one query about Fine Grained model in
>> > Spark.
>> > I have a simple Spark application which 

Re: Kafka Spark structured streaming latency benchmark.

2016-12-19 Thread Shixiong(Ryan) Zhu
Hey Prashant. Thanks for your codes. I did some investigation and it turned
out that ContextCleaner is too slow and its "referenceQueue" keeps growing.
My hunch is cleaning broadcast is very slow since it's a blocking call.

On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Hey, Prashant. Could you track the GC root of byte arrays in the heap?
>
> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
> wrote:
>
>> Furthermore, I ran the same thing with 26 GB as the memory, which would
>> mean 1.3GB per thread of memory. My jmap
>> 
>> results and jstat
>> 
>> results collected after running the job for more than 11h, again show a
>> memory constraint. The same gradual slowdown, but a bit more gradual as
>> memory is considerably more than the previous runs.
>>
>>
>>
>>
>> This situation sounds like a memory leak ? As the byte array objects are
>> more than 13GB, and are not garbage collected.
>>
>> --Prashant
>>
>>
>> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> Goal of my benchmark is to arrive at end to end latency lower than 100ms
>>> and sustain them over time, by consuming from a kafka topic and writing
>>> back to another kafka topic using Spark. Since the job does not do
>>> aggregation and does a constant time processing on each message, it
>>> appeared to me as an achievable target. But, then there are some surprising
>>> and interesting pattern to observe.
>>>
>>>  Basically, it has four components namely,
>>> 1) kafka
>>> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with each
>>> message of about 1KB.
>>> 3) Spark  job subscribed to `test` topic and writes out to another topic
>>> `output`.
>>> 4) A Kafka consumer, reading from the `output` topic.
>>>
>>> How the latency was measured ?
>>>
>>> While sending messages from kafka producer, each message is embedded the
>>> timestamp at which it is pushed to the kafka `test` topic. Spark receives
>>> each message and writes them out to `output` topic as is. When these
>>> messages arrive at Kafka consumer, their embedded time is subtracted from
>>> the time of arrival at the consumer and a scatter plot of the same is
>>> attached.
>>>
>>> The scatter plots sample only 10 minutes of data received during initial
>>> one hour and then again 10 minutes of data received after 2 hours of run.
>>>
>>>
>>>
>>> These plots indicate a significant slowdown in latency, in the later
>>> scatter plot indicate almost all the messages were received with a delay
>>> larger than 2 seconds. However, first plot show that most messages arrived
>>> in less than 100ms latency. The two samples were taken with time difference
>>> of 2 hours approx.
>>>
>>> After running the test for 24 hours, the jstat
>>> 
>>> and jmap
>>> 
>>>  output
>>> for the jobs indicate possibility  of memory constrains. To be more clear,
>>> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
>>> is straight forward and located here: https://github.com/ScrapCodes/
>>> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
>>> es/kafka/SparkSQLKafkaConsumer.scala .
>>>
>>>
>>> What is causing the gradual slowdown? I need help in diagnosing the
>>> problem.
>>>
>>> Thanks,
>>>
>>> --Prashant
>>>
>>>
>>
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Michael Gummelt
> Is this problem of idle executors sticking around solved in Dynamic
Resource Allocation?  Is there some timeout after which Idle executors can
just shutdown and cleanup its resources.

Yes, that's exactly what dynamic allocation does.  But again I have no idea
what the state of dynamic allocation + mesos is.

On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit 
wrote:

> Great.  Makes much better sense now.  What will be reason to have
> spark.mesos.mesosExecutor.cores more than 1, as this number doesn't
> include the number of cores for tasks.
>
> So in my case it seems like 30 CPUs are allocated to executors.  And there
> are 48 tasks so 48 + 30 =  78 CPUs.  And i am noticing this gap of 30 is
> maintained till the last task exits.  This explains the gap.   Thanks
> everyone.  I am still not sure how this number 30 is calculated.  ( Is it
> dynamic based on current resources, or is it some configuration.  I have 32
> nodes in my cluster).
>
> Is this problem of idle executors sticking around solved in Dynamic
> Resource Allocation?  Is there some timeout after which Idle executors can
> just shutdown and cleanup its resources.
>
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt 
> wrote:
>
>> >  I should preassume that No of executors should be less than number of
>> tasks.
>>
>> No.  Each executor runs 0 or more tasks.
>>
>> Each executor consumes 1 CPU, and each task running on that executor
>> consumes another CPU.  You can customize this via
>> spark.mesos.mesosExecutor.cores (https://github.com/apache/spa
>> rk/blob/v1.6.3/docs/running-on-mesos.md) and spark.task.cpus (
>> https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md)
>>
>> On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit 
>> wrote:
>>
>>> Ah thanks. looks like i skipped reading this *"Neither will executors
>>> terminate when they’re idle."*
>>>
>>> So in my job scenario,  I should preassume that No of executors should
>>> be less than number of tasks. Ideally one executor should execute 1 or more
>>> tasks.  But i am observing something strange instead.  I start my job with
>>> 48 partitions for a spark job. In mesos ui i see that number of tasks is
>>> 48, but no. of CPUs is 78 which is way more than 48.  Here i am assuming
>>> that 1 CPU is 1 executor.   I am not specifying any configuration to set
>>> number of cores per executor.
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>> On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere <
>>> jo...@mesosphere.io> wrote:
>>>
 That makes sense. From the documentation it looks like the executors
 are not supposed to terminate:
 http://spark.apache.org/docs/latest/running-on-mesos.html#fi
 ne-grained-deprecated

> Note that while Spark tasks in fine-grained will relinquish cores as
> they terminate, they will not relinquish memory, as the JVM does not give
> memory back to the Operating System. Neither will executors terminate when
> they’re idle.


 I suppose your task to executor CPU ratio is low enough that it looks
 like most of the resources are not being reclaimed. If your tasks were
 using significantly more CPU the amortized cost of the idle executors would
 not be such a big deal.


 —
 *Joris Van Remoortere*
 Mesosphere

 On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen 
 wrote:

> Hi Chawla,
>
> One possible reason is that Mesos fine grain mode also takes up cores
> to run the executor per host, so if you have 20 agents running Fine
> grained executor it will take up 20 cores while it's still running.
>
> Tim
>
> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
> wrote:
> > Hi
> >
> > I am using Spark 1.6. I have one query about Fine Grained model in
> Spark.
> > I have a simple Spark application which transforms A -> B.  Its a
> single
> > stage application.  To begin the program, It starts with 48
> partitions.
> > When the program starts running, in mesos UI it shows 48 tasks and
> 48 CPUs
> > allocated to job.  Now as the tasks get done, the number of active
> tasks
> > number starts decreasing.  How ever, the number of CPUs does not
> decrease
> > propotionally.  When the job was about to finish, there was a single
> > remaininig task, however CPU count was still 20.
> >
> > My questions, is why there is no one to one mapping between tasks
> and cpus
> > in Fine grained?  How can these CPUs be released when the job is
> done, so
> > that other jobs can start.
> >
> >
> > Regards
> > Sumit Chawla
>


>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Chawla,Sumit
Great.  Makes much better sense now.  What will be reason to have
spark.mesos.mesosExecutor.cores more than 1, as this number doesn't include
the number of cores for tasks.

So in my case it seems like 30 CPUs are allocated to executors.  And there
are 48 tasks so 48 + 30 =  78 CPUs.  And i am noticing this gap of 30 is
maintained till the last task exits.  This explains the gap.   Thanks
everyone.  I am still not sure how this number 30 is calculated.  ( Is it
dynamic based on current resources, or is it some configuration.  I have 32
nodes in my cluster).

Is this problem of idle executors sticking around solved in Dynamic
Resource Allocation?  Is there some timeout after which Idle executors can
just shutdown and cleanup its resources.


Regards
Sumit Chawla


On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt 
wrote:

> >  I should preassume that No of executors should be less than number of
> tasks.
>
> No.  Each executor runs 0 or more tasks.
>
> Each executor consumes 1 CPU, and each task running on that executor
> consumes another CPU.  You can customize this via 
> spark.mesos.mesosExecutor.cores
> (https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md)
> and spark.task.cpus (https://github.com/apache/spark/blob/v1.6.3/docs/
> configuration.md)
>
> On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit 
> wrote:
>
>> Ah thanks. looks like i skipped reading this *"Neither will executors
>> terminate when they’re idle."*
>>
>> So in my job scenario,  I should preassume that No of executors should be
>> less than number of tasks. Ideally one executor should execute 1 or more
>> tasks.  But i am observing something strange instead.  I start my job with
>> 48 partitions for a spark job. In mesos ui i see that number of tasks is
>> 48, but no. of CPUs is 78 which is way more than 48.  Here i am assuming
>> that 1 CPU is 1 executor.   I am not specifying any configuration to set
>> number of cores per executor.
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere <
>> jo...@mesosphere.io> wrote:
>>
>>> That makes sense. From the documentation it looks like the executors are
>>> not supposed to terminate:
>>> http://spark.apache.org/docs/latest/running-on-mesos.html#fi
>>> ne-grained-deprecated
>>>
 Note that while Spark tasks in fine-grained will relinquish cores as
 they terminate, they will not relinquish memory, as the JVM does not give
 memory back to the Operating System. Neither will executors terminate when
 they’re idle.
>>>
>>>
>>> I suppose your task to executor CPU ratio is low enough that it looks
>>> like most of the resources are not being reclaimed. If your tasks were
>>> using significantly more CPU the amortized cost of the idle executors would
>>> not be such a big deal.
>>>
>>>
>>> —
>>> *Joris Van Remoortere*
>>> Mesosphere
>>>
>>> On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen 
>>> wrote:
>>>
 Hi Chawla,

 One possible reason is that Mesos fine grain mode also takes up cores
 to run the executor per host, so if you have 20 agents running Fine
 grained executor it will take up 20 cores while it's still running.

 Tim

 On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
 wrote:
 > Hi
 >
 > I am using Spark 1.6. I have one query about Fine Grained model in
 Spark.
 > I have a simple Spark application which transforms A -> B.  Its a
 single
 > stage application.  To begin the program, It starts with 48
 partitions.
 > When the program starts running, in mesos UI it shows 48 tasks and 48
 CPUs
 > allocated to job.  Now as the tasks get done, the number of active
 tasks
 > number starts decreasing.  How ever, the number of CPUs does not
 decrease
 > propotionally.  When the job was about to finish, there was a single
 > remaininig task, however CPU count was still 20.
 >
 > My questions, is why there is no one to one mapping between tasks and
 cpus
 > in Fine grained?  How can these CPUs be released when the job is
 done, so
 > that other jobs can start.
 >
 >
 > Regards
 > Sumit Chawla

>>>
>>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Joris Van Remoortere
That makes sense. From the documentation it looks like the executors are
not supposed to terminate:
http://spark.apache.org/docs/latest/running-on-mesos.html#fine-grained-deprecated

> Note that while Spark tasks in fine-grained will relinquish cores as they
> terminate, they will not relinquish memory, as the JVM does not give memory
> back to the Operating System. Neither will executors terminate when they’re
> idle.


I suppose your task to executor CPU ratio is low enough that it looks like
most of the resources are not being reclaimed. If your tasks were using
significantly more CPU the amortized cost of the idle executors would not
be such a big deal.


—
*Joris Van Remoortere*
Mesosphere

On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen  wrote:

> Hi Chawla,
>
> One possible reason is that Mesos fine grain mode also takes up cores
> to run the executor per host, so if you have 20 agents running Fine
> grained executor it will take up 20 cores while it's still running.
>
> Tim
>
> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
> wrote:
> > Hi
> >
> > I am using Spark 1.6. I have one query about Fine Grained model in Spark.
> > I have a simple Spark application which transforms A -> B.  Its a single
> > stage application.  To begin the program, It starts with 48 partitions.
> > When the program starts running, in mesos UI it shows 48 tasks and 48
> CPUs
> > allocated to job.  Now as the tasks get done, the number of active tasks
> > number starts decreasing.  How ever, the number of CPUs does not decrease
> > propotionally.  When the job was about to finish, there was a single
> > remaininig task, however CPU count was still 20.
> >
> > My questions, is why there is no one to one mapping between tasks and
> cpus
> > in Fine grained?  How can these CPUs be released when the job is done, so
> > that other jobs can start.
> >
> >
> > Regards
> > Sumit Chawla
>


Re: Kafka Spark structured streaming latency benchmark.

2016-12-19 Thread Shixiong(Ryan) Zhu
Hey, Prashant. Could you track the GC root of byte arrays in the heap?

On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
wrote:

> Furthermore, I ran the same thing with 26 GB as the memory, which would
> mean 1.3GB per thread of memory. My jmap
> 
> results and jstat
> 
> results collected after running the job for more than 11h, again show a
> memory constraint. The same gradual slowdown, but a bit more gradual as
> memory is considerably more than the previous runs.
>
>
>
>
> This situation sounds like a memory leak ? As the byte array objects are
> more than 13GB, and are not garbage collected.
>
> --Prashant
>
>
> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
> wrote:
>
>> Hi,
>>
>> Goal of my benchmark is to arrive at end to end latency lower than 100ms
>> and sustain them over time, by consuming from a kafka topic and writing
>> back to another kafka topic using Spark. Since the job does not do
>> aggregation and does a constant time processing on each message, it
>> appeared to me as an achievable target. But, then there are some surprising
>> and interesting pattern to observe.
>>
>>  Basically, it has four components namely,
>> 1) kafka
>> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with each
>> message of about 1KB.
>> 3) Spark  job subscribed to `test` topic and writes out to another topic
>> `output`.
>> 4) A Kafka consumer, reading from the `output` topic.
>>
>> How the latency was measured ?
>>
>> While sending messages from kafka producer, each message is embedded the
>> timestamp at which it is pushed to the kafka `test` topic. Spark receives
>> each message and writes them out to `output` topic as is. When these
>> messages arrive at Kafka consumer, their embedded time is subtracted from
>> the time of arrival at the consumer and a scatter plot of the same is
>> attached.
>>
>> The scatter plots sample only 10 minutes of data received during initial
>> one hour and then again 10 minutes of data received after 2 hours of run.
>>
>>
>>
>> These plots indicate a significant slowdown in latency, in the later
>> scatter plot indicate almost all the messages were received with a delay
>> larger than 2 seconds. However, first plot show that most messages arrived
>> in less than 100ms latency. The two samples were taken with time difference
>> of 2 hours approx.
>>
>> After running the test for 24 hours, the jstat
>> 
>> and jmap
>> 
>>  output
>> for the jobs indicate possibility  of memory constrains. To be more clear,
>> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
>> is straight forward and located here: https://github.com/ScrapCodes/
>> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
>> es/kafka/SparkSQLKafkaConsumer.scala .
>>
>>
>> What is causing the gradual slowdown? I need help in diagnosing the
>> problem.
>>
>> Thanks,
>>
>> --Prashant
>>
>>
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Michael Gummelt
>  I should preassume that No of executors should be less than number of
tasks.

No.  Each executor runs 0 or more tasks.

Each executor consumes 1 CPU, and each task running on that executor
consumes another CPU.  You can customize this via
spark.mesos.mesosExecutor.cores (
https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md) and
spark.task.cpus (
https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md)

On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit 
wrote:

> Ah thanks. looks like i skipped reading this *"Neither will executors
> terminate when they’re idle."*
>
> So in my job scenario,  I should preassume that No of executors should be
> less than number of tasks. Ideally one executor should execute 1 or more
> tasks.  But i am observing something strange instead.  I start my job with
> 48 partitions for a spark job. In mesos ui i see that number of tasks is
> 48, but no. of CPUs is 78 which is way more than 48.  Here i am assuming
> that 1 CPU is 1 executor.   I am not specifying any configuration to set
> number of cores per executor.
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere <
> jo...@mesosphere.io> wrote:
>
>> That makes sense. From the documentation it looks like the executors are
>> not supposed to terminate:
>> http://spark.apache.org/docs/latest/running-on-mesos.html#fi
>> ne-grained-deprecated
>>
>>> Note that while Spark tasks in fine-grained will relinquish cores as
>>> they terminate, they will not relinquish memory, as the JVM does not give
>>> memory back to the Operating System. Neither will executors terminate when
>>> they’re idle.
>>
>>
>> I suppose your task to executor CPU ratio is low enough that it looks
>> like most of the resources are not being reclaimed. If your tasks were
>> using significantly more CPU the amortized cost of the idle executors would
>> not be such a big deal.
>>
>>
>> —
>> *Joris Van Remoortere*
>> Mesosphere
>>
>> On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen  wrote:
>>
>>> Hi Chawla,
>>>
>>> One possible reason is that Mesos fine grain mode also takes up cores
>>> to run the executor per host, so if you have 20 agents running Fine
>>> grained executor it will take up 20 cores while it's still running.
>>>
>>> Tim
>>>
>>> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
>>> wrote:
>>> > Hi
>>> >
>>> > I am using Spark 1.6. I have one query about Fine Grained model in
>>> Spark.
>>> > I have a simple Spark application which transforms A -> B.  Its a
>>> single
>>> > stage application.  To begin the program, It starts with 48 partitions.
>>> > When the program starts running, in mesos UI it shows 48 tasks and 48
>>> CPUs
>>> > allocated to job.  Now as the tasks get done, the number of active
>>> tasks
>>> > number starts decreasing.  How ever, the number of CPUs does not
>>> decrease
>>> > propotionally.  When the job was about to finish, there was a single
>>> > remaininig task, however CPU count was still 20.
>>> >
>>> > My questions, is why there is no one to one mapping between tasks and
>>> cpus
>>> > in Fine grained?  How can these CPUs be released when the job is done,
>>> so
>>> > that other jobs can start.
>>> >
>>> >
>>> > Regards
>>> > Sumit Chawla
>>>
>>
>>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


java.lang.AssertionError: assertion failed

2016-12-19 Thread samkum
I am using Apache Spark 2.0.2 and facing following issue while using
cartesian product in Spark Streaming module.

I am using compression codec as snappy but facing the same issue while using
the default one:LZ4, also using kryo for serialization.

I also see ample memory available in the executor section.

Please find the stacktrace below:-

java.lang.AssertionError: assertion failed at
scala.Predef$.assert(Predef.scala:156) at
org.apache.spark.util.collection.ExternalAppendOnlyMap.forceSpill(ExternalAppendOnlyMap.scala:195)
at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:111) at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:150)
at
org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:147)
at org.apache.spark.util.collection.Spillable.maybeSpill(Spillable.scala:86)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:160)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41) at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:96)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:94)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 

Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Mehdi Meziane
I think that what you are looking for is Dynamic resource allocation: 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 


Spark provides a mechanism to dynamically adjust the resources your application 
occupies based on the workload. This means that your application may give 
resources back to the cluster if they are no longer used and request them again 
later when there is demand. This feature is particularly useful if multiple 
applications share resources in your Spark cluster. 

- Mail Original - 
De: "Sumit Chawla"  
À: "Michael Gummelt"  
Cc: u...@mesos.apache.org, "Dev" , "User" 
, "dev"  
Envoyé: Lundi 19 Décembre 2016 19h35:51 GMT +01:00 Amsterdam / Berlin / Berne / 
Rome / Stockholm / Vienne 
Objet: Re: Mesos Spark Fine Grained Execution - CPU count 


But coarse grained does the exact same thing which i am trying to avert here. 
At the cost of lower startup, it keeps the resources reserved till the entire 
duration of the job. 



Regards 
Sumit Chawla 



On Mon, Dec 19, 2016 at 10:06 AM, Michael Gummelt < mgumm...@mesosphere.io > 
wrote: 




Hi 

I don't have a lot of experience with the fine-grained scheduler. It's 
deprecated and fairly old now. CPUs should be relinquished as tasks complete, 
so I'm not sure why you're seeing what you're seeing. There have been a few 
discussions on the spark list regarding deprecating the fine-grained scheduler, 
and no one seemed too dead-set on keeping it. I'd recommend you move over to 
coarse-grained. 





On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 



Hi 


I am using Spark 1.6. I have one query about Fine Grained model in Spark. I 
have a simple Spark application which transforms A -> B. Its a single stage 
application. To begin the program, It starts with 48 partitions. When the 
program starts running, in mesos UI it shows 48 tasks and 48 CPUs allocated to 
job. Now as the tasks get done, the number of active tasks number starts 
decreasing. How ever, the number of CPUs does not decrease propotionally. When 
the job was about to finish, there was a single remaininig task, however CPU 
count was still 20. 


My questions, is why there is no one to one mapping between tasks and cpus in 
Fine grained? How can these CPUs be released when the job is done, so that 
other jobs can start. 






Regards 
Sumit Chawla 




-- 







Michael Gummelt 
Software Engineer 
Mesosphere 



Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Timothy Chen
Hi Chawla,

One possible reason is that Mesos fine grain mode also takes up cores
to run the executor per host, so if you have 20 agents running Fine
grained executor it will take up 20 cores while it's still running.

Tim

On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit  wrote:
> Hi
>
> I am using Spark 1.6. I have one query about Fine Grained model in Spark.
> I have a simple Spark application which transforms A -> B.  Its a single
> stage application.  To begin the program, It starts with 48 partitions.
> When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs
> allocated to job.  Now as the tasks get done, the number of active tasks
> number starts decreasing.  How ever, the number of CPUs does not decrease
> propotionally.  When the job was about to finish, there was a single
> remaininig task, however CPU count was still 20.
>
> My questions, is why there is no one to one mapping between tasks and cpus
> in Fine grained?  How can these CPUs be released when the job is done, so
> that other jobs can start.
>
>
> Regards
> Sumit Chawla

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Michael Gummelt
Yea, the idea is to use dynamic allocation.  I can't speak to how well it
works with Mesos, though.

On Mon, Dec 19, 2016 at 11:01 AM, Mehdi Meziane 
wrote:

> I think that what you are looking for is Dynamic resource allocation:
> http://spark.apache.org/docs/latest/job-scheduling.html#
> dynamic-resource-allocation
>
> Spark provides a mechanism to dynamically adjust the resources your
> application occupies based on the workload. This means that your
> application may give resources back to the cluster if they are no longer
> used and request them again later when there is demand. This feature is
> particularly useful if multiple applications share resources in your Spark
> cluster.
>
> - Mail Original -
> De: "Sumit Chawla" 
> À: "Michael Gummelt" 
> Cc: u...@mesos.apache.org, "Dev" , "User" <
> u...@spark.apache.org>, "dev" 
> Envoyé: Lundi 19 Décembre 2016 19h35:51 GMT +01:00 Amsterdam / Berlin /
> Berne / Rome / Stockholm / Vienne
> Objet: Re: Mesos Spark Fine Grained Execution - CPU count
>
>
> But coarse grained does the exact same thing which i am trying to avert
> here.  At the cost of lower startup, it keeps the resources reserved till
> the entire duration of the job.
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 19, 2016 at 10:06 AM, Michael Gummelt 
> wrote:
>
>> Hi
>>
>> I don't have a lot of experience with the fine-grained scheduler.  It's
>> deprecated and fairly old now.  CPUs should be relinquished as tasks
>> complete, so I'm not sure why you're seeing what you're seeing.  There have
>> been a few discussions on the spark list regarding deprecating the
>> fine-grained scheduler, and no one seemed too dead-set on keeping it.  I'd
>> recommend you move over to coarse-grained.
>>
>> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
>> wrote:
>>
>>> Hi
>>>
>>> I am using Spark 1.6. I have one query about Fine Grained model in
>>> Spark.  I have a simple Spark application which transforms A -> B.  Its a
>>> single stage application.  To begin the program, It starts with 48
>>> partitions.  When the program starts running, in mesos UI it shows 48 tasks
>>> and 48 CPUs allocated to job.  Now as the tasks get done, the number of
>>> active tasks number starts decreasing.  How ever, the number of CPUs does
>>> not decrease propotionally.  When the job was about to finish, there was a
>>> single remaininig task, however CPU count was still 20.
>>>
>>> My questions, is why there is no one to one mapping between tasks and
>>> cpus in Fine grained?  How can these CPUs be released when the job is done,
>>> so that other jobs can start.
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: [VOTE] Apache Spark 2.1.0 (RC5)

2016-12-19 Thread Nicholas Chammas
Since it’s not a regression from 2.0 (I believe the same issue affects both
2.0 and 2.1) it doesn’t merit a -1 vote according to the voting guidelines.

Of course, it would be nice if we could fix the various optimizer issues
that all seem to have a workaround that involves persist() (another one is
SPARK-18492 ) but I
don’t think this should block the release.
​

On Mon, Dec 19, 2016 at 12:36 PM Franklyn D'souza <
franklyn.dso...@shopify.com> wrote:

> -1 https://issues.apache.org/jira/browse/SPARK-18589 hasn't been resolved
> by this release and is a blocker in our adoption of spark 2.0. I've updated
> the issue with some steps to reproduce the error.
>
> On Mon, Dec 19, 2016 at 4:37 AM, Sean Owen  wrote:
>
> PS, here are the open issues for 2.1.0. Forgot this one. No Blockers, but
> one "Critical":
>
> SPARK-16845
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
> grows beyond 64 KB
>
> SPARK-18669 Update Apache docs regard watermarking in Structured Streaming
>
> SPARK-18894 Event time watermark delay threshold specified in months or
> years gives incorrect results
>
> SPARK-18899 append data to a bucketed table with mismatched bucketing
> should fail
>
> SPARK-18909 The error message in `ExpressionEncoder.toRow` and `fromRow`
> is too verbose
>
> SPARK-18912 append to a non-file-based data source table should detect
> columns number mismatch
>
> SPARK-18913 append to a table with special column names should work
>
> SPARK-18921 check database existence with Hive.databaseExists instead of
> getDatabase
>
>
> On Fri, Dec 16, 2016 at 5:17 AM Reynold Xin  wrote:
>
> Please vote on releasing the following candidate as Apache Spark version
> 2.1.0. The vote is open until Sun, December 18, 2016 at 21:30 PT and passes
> if a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 2.1.0
> [ ] -1 Do not release this package because ...
>
>
> To learn more about Apache Spark, please see http://spark.apache.org/
>
> The tag to be voted on is v2.1.0-rc5
> (cd0a08361e2526519e7c131c42116bf56fa62c76)
>
> List of JIRA tickets resolved are:
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>
> The release files, including signatures, digests, etc. can be found at:
> http://home.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-bin/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1223/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-docs/
>
>
> *FAQ*
>
> *How can I help test this release?*
>
> If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> *What should happen to JIRA tickets still targeting 2.1.0?*
>
> Committers should look at those and triage. Extremely important bug fixes,
> documentation, and API tweaks that impact compatibility should be worked on
> immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>
> *What happened to RC3/RC5?*
>
> They had issues withe release packaging and as a result were skipped.
>
>
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Chawla,Sumit
But coarse grained does the exact same thing which i am trying to avert
here.  At the cost of lower startup, it keeps the resources reserved till
the entire duration of the job.

Regards
Sumit Chawla


On Mon, Dec 19, 2016 at 10:06 AM, Michael Gummelt 
wrote:

> Hi
>
> I don't have a lot of experience with the fine-grained scheduler.  It's
> deprecated and fairly old now.  CPUs should be relinquished as tasks
> complete, so I'm not sure why you're seeing what you're seeing.  There have
> been a few discussions on the spark list regarding deprecating the
> fine-grained scheduler, and no one seemed too dead-set on keeping it.  I'd
> recommend you move over to coarse-grained.
>
> On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
> wrote:
>
>> Hi
>>
>> I am using Spark 1.6. I have one query about Fine Grained model in
>> Spark.  I have a simple Spark application which transforms A -> B.  Its a
>> single stage application.  To begin the program, It starts with 48
>> partitions.  When the program starts running, in mesos UI it shows 48 tasks
>> and 48 CPUs allocated to job.  Now as the tasks get done, the number of
>> active tasks number starts decreasing.  How ever, the number of CPUs does
>> not decrease propotionally.  When the job was about to finish, there was a
>> single remaininig task, however CPU count was still 20.
>>
>> My questions, is why there is no one to one mapping between tasks and
>> cpus in Fine grained?  How can these CPUs be released when the job is done,
>> so that other jobs can start.
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Michael Gummelt
Hi

I don't have a lot of experience with the fine-grained scheduler.  It's
deprecated and fairly old now.  CPUs should be relinquished as tasks
complete, so I'm not sure why you're seeing what you're seeing.  There have
been a few discussions on the spark list regarding deprecating the
fine-grained scheduler, and no one seemed too dead-set on keeping it.  I'd
recommend you move over to coarse-grained.

On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit 
wrote:

> Hi
>
> I am using Spark 1.6. I have one query about Fine Grained model in Spark.
> I have a simple Spark application which transforms A -> B.  Its a single
> stage application.  To begin the program, It starts with 48 partitions.
> When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs
> allocated to job.  Now as the tasks get done, the number of active tasks
> number starts decreasing.  How ever, the number of CPUs does not decrease
> propotionally.  When the job was about to finish, there was a single
> remaininig task, however CPU count was still 20.
>
> My questions, is why there is no one to one mapping between tasks and cpus
> in Fine grained?  How can these CPUs be released when the job is done, so
> that other jobs can start.
>
>
> Regards
> Sumit Chawla
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: [VOTE] Apache Spark 2.1.0 (RC5)

2016-12-19 Thread Franklyn D'souza
-1 https://issues.apache.org/jira/browse/SPARK-18589 hasn't been resolved
by this release and is a blocker in our adoption of spark 2.0. I've updated
the issue with some steps to reproduce the error.

On Mon, Dec 19, 2016 at 4:37 AM, Sean Owen  wrote:

> PS, here are the open issues for 2.1.0. Forgot this one. No Blockers, but
> one "Critical":
>
> SPARK-16845 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
> grows beyond 64 KB
>
> SPARK-18669 Update Apache docs regard watermarking in Structured Streaming
>
> SPARK-18894 Event time watermark delay threshold specified in months or
> years gives incorrect results
>
> SPARK-18899 append data to a bucketed table with mismatched bucketing
> should fail
>
> SPARK-18909 The error message in `ExpressionEncoder.toRow` and `fromRow`
> is too verbose
>
> SPARK-18912 append to a non-file-based data source table should detect
> columns number mismatch
>
> SPARK-18913 append to a table with special column names should work
>
> SPARK-18921 check database existence with Hive.databaseExists instead of
> getDatabase
>
>
> On Fri, Dec 16, 2016 at 5:17 AM Reynold Xin  wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.1.0. The vote is open until Sun, December 18, 2016 at 21:30 PT and passes
>> if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.1.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.1.0-rc5 (cd0a08361e2526519e7c131c42116b
>> f56fa62c76)
>>
>> List of JIRA tickets resolved are:  https://issues.apache.org/
>> jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://home.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1223/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-docs/
>>
>>
>> *FAQ*
>>
>> *How can I help test this release?*
>>
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> *What should happen to JIRA tickets still targeting 2.1.0?*
>>
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>>
>> *What happened to RC3/RC5?*
>>
>> They had issues withe release packaging and as a result were skipped.
>>
>>


stratified sampling scales poorly

2016-12-19 Thread Martin Le
Hi all,

I perform sampling on a DStream by taking samples from RDDs in the DStream.
I have used two sampling mechanisms: simple random sampling and stratified
sampling.

Simple random sampling: inputStream.transform(x => x.sample(false,
fraction)).

Stratified sampling: inputStream.transform(x => x.sampleByKeyExact(false,
fractions))

where fractions = Map(“key1”-> fraction,  “key2”-> fraction, …, “keyn”->
fraction).

I have a question is that why stratified sampling scales poorly with
different sampling fractions in this context? meanwhile simple random
sampling scales well with different sampling fractions (I ran experiments
on 4 nodes cluster )?

Thank you,

Martin


Re: [VOTE] Apache Spark 2.1.0 (RC5)

2016-12-19 Thread Sean Owen
PS, here are the open issues for 2.1.0. Forgot this one. No Blockers, but
one "Critical":

SPARK-16845
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
grows beyond 64 KB

SPARK-18669 Update Apache docs regard watermarking in Structured Streaming

SPARK-18894 Event time watermark delay threshold specified in months or
years gives incorrect results

SPARK-18899 append data to a bucketed table with mismatched bucketing
should fail

SPARK-18909 The error message in `ExpressionEncoder.toRow` and `fromRow` is
too verbose

SPARK-18912 append to a non-file-based data source table should detect
columns number mismatch

SPARK-18913 append to a table with special column names should work

SPARK-18921 check database existence with Hive.databaseExists instead of
getDatabase


On Fri, Dec 16, 2016 at 5:17 AM Reynold Xin  wrote:

> Please vote on releasing the following candidate as Apache Spark version
> 2.1.0. The vote is open until Sun, December 18, 2016 at 21:30 PT and passes
> if a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 2.1.0
> [ ] -1 Do not release this package because ...
>
>
> To learn more about Apache Spark, please see http://spark.apache.org/
>
> The tag to be voted on is v2.1.0-rc5
> (cd0a08361e2526519e7c131c42116bf56fa62c76)
>
> List of JIRA tickets resolved are:
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>
> The release files, including signatures, digests, etc. can be found at:
> http://home.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-bin/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1223/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc5-docs/
>
>
> *FAQ*
>
> *How can I help test this release?*
>
> If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> *What should happen to JIRA tickets still targeting 2.1.0?*
>
> Committers should look at those and triage. Extremely important bug fixes,
> documentation, and API tweaks that impact compatibility should be worked on
> immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>
> *What happened to RC3/RC5?*
>
> They had issues withe release packaging and as a result were skipped.
>
>


Re: Aggregating over sorted data

2016-12-19 Thread Robin East
This is also a feature we need for our time-series processing 



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh  wrote:
> 
> 
> Hi,
> 
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
> 
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
> 
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
> 
> If you really need this feature, you may open a Jira to ask others' opinion
> about this feature.
> 
> 
> 
> 
> 
> 
> -
> Liang-Chi Hsieh | @viirya 
> Spark Technology Center 
> http://www.spark.tc/ 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20273.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org