Re: Re: Selecting the top 100 records per group by?

2016-09-29 Thread Mariano Semelman
It's not Spark specific, but it answers your question:
https://blog.jooq.org/2014/08/12/the-difference-between-row_number-rank-and-dense_rank/

On 12 September 2016 at 12:42, Mich Talebzadeh 
wrote:

> Hi,
>
> I don't understand why you need to add a column row_number when you can
> use rank or dens_rank?
>
> Why  one cannot one use rank or dens_rank here?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 12 September 2016 at 15:37,  wrote:
>
>> hi kevin
>> window function is what you need, like below:
>> val hivetable = hc.sql("select * from house_sale_pv_location")
>> val overLocation = Window.partitionBy(hivetable.c
>> ol("lp_location_id"))
>> val sortedDF = hivetable.withColumn("rowNumber",
>> row_number().over(overLocation)).filter("rowNumber<=50")
>>
>> here I add a column as rownumber,  get all data partitioned and get the
>> top 50 rows.
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>> - 原始邮件 -
>> 发件人:Mich Talebzadeh 
>> 收件人:"user @spark" 
>> 主题:Re: Selecting the top 100 records per group by?
>> 日期:2016年09月11日 22点20分
>>
>> You can of course do this using FP.
>>
>> val wSpec = Window.partitionBy('price).orderBy(desc("price"))
>> df2.filter('security > " 
>> ").select(dense_rank().over(wSpec).as("rank"),'TIMECREATED,
>> 'SECURITY, substring('PRICE,1,7)).filter('rank<=10).show
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 September 2016 at 07:15, Mich Talebzadeh > > wrote:
>>
>> DENSE_RANK will give you ordering and sequence within a particular
>> column. This is Hive
>>
>>  var sqltext = """
>>  | SELECT RANK, timecreated,security, price
>>  |  FROM (
>>  |SELECT timecreated,security, price,
>>  |   DENSE_RANK() OVER (ORDER BY price DESC ) AS RANK
>>  |  FROM test.prices
>>  |   ) tmp
>>  |  WHERE rank <= 10
>>  | """
>> sql(sqltext).collect.foreach(println)
>>
>> [1,2016-09-09 16:55:44,Esso,99.995]
>> [1,2016-09-09 21:22:52,AVIVA,99.995]
>> [1,2016-09-09 21:22:52,Barclays,99.995]
>> [1,2016-09-09 21:24:28,JPM,99.995]
>> [1,2016-09-09 21:30:38,Microsoft,99.995]
>> [1,2016-09-09 21:31:12,UNILEVER,99.995]
>> [2,2016-09-09 16:54:14,BP,99.99]
>> [2,2016-09-09 16:54:36,Tate & Lyle,99.99]
>> [2,2016-09-09 16:56:28,EASYJET,99.99]
>> [2,2016-09-09 16:59:28,IBM,99.99]
>> [2,2016-09-09 20:16:10,EXPERIAN,99.99]
>> [2,2016-09-09 22:25:20,Microsoft,99.99]
>> [2,2016-09-09 22:53:49,Tate & Lyle,99.99]
>> [3,2016-09-09 15:31:06,UNILEVER,99.985]
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 September 2016 at 04:32, Kevin Burton  wrote:
>>
>> Looks like you can do it with dense_rank functions.
>>
>> https://databricks.com/blog/2015/07/15/introducing-window-fu
>> nctions-in-spark-sql.html
>>
>> I setup some basic records and seems like it did the right thing.
>>
>> Now time to throw 50TB and 100 spark nodes at this problem and see what
>> happens :)
>>
>> On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton  wrote:
>>
>> Ah.. might actually. I'll have to mess around with 

Re: Submit and Monitor standalone cluster application

2016-09-29 Thread Mariano Semelman
Sorry, my mistake (quick copy-paste), livy doesn't let me submit
applications the classic way (with assembly jars) and force me to change
all my current applications.

--

*Mariano Semelman*
P13N - IT
Av. Corrientes Nº 746 - piso 13 - C.A.B.A. (C1043AAU)
Teléfono (54) 11- *4894-3500*


[image: Seguinos en Twitter!] <http://twitter.com/#!/despegarar> [image:
Seguinos en Facebook!] <http://www.facebook.com/despegar> [image: Seguinos
en YouTube!] <http://www.youtube.com/Despegar>
*Despegar.com*
El mejor precio para tu viaje.

Este mensaje es confidencial y puede contener información amparada por el
secreto profesional. Si usted ha recibido este e-mail por error, por favor
comuníquenoslo inmediatamente respondiendo a este e-mail y luego
eliminándolo de su sistema. El contenido de este mensaje no deberá ser
copiado ni divulgado a ninguna persona.

On 29 September 2016 at 01:08, Ofer Eliassaf <ofer.elias...@gmail.com>
wrote:

> Are u sure that livy doesn't support standalone cluster mode?
>
> On Thu, Sep 29, 2016 at 1:42 AM, Mariano Semelman <
> mariano.semel...@despegar.com> wrote:
>
>> ​Hello everybody,
>>
>> I'm developing an application to submit batch and streaming apps in a
>> fault tolerant fashion. For that I need a programatically way to submit and
>> monitor my apps and relaunch them in case of failure. Right now I'm using
>> spark standalone (1.6.x) and submitting in cluster mode. The options I have
>> explored so far are:
>>
>> SparkLauncher.java [1]: It has two modes:
>> - 1) launch() Doesn't give me the application-id in order to monitor
>> (with spark master rest API). Would have to infer from the application name 
>> and
>> startTime in api/v1/applications using the spark master API [9]
>> - 2) startApplication(...) Only works if submitted locally or client
>> mode (BTW, the fact that only works in client or local mode is not
>> documented in the package summary page[1] which led me to many, many wasted
>> hours)
>>
>> Spark-Jobserver [2]:
>> Doesn't support standalone cluster mode
>>
>> Livy [3]:
>> Doesn't support standalone cluster mode
>>
>> Spark Submission Rest API [4,5,6]:
>> It seems the sensible way, but is black magic for the user. It's not
>> documented and there's no official Client. There's only one [7] unofficial
>> client. And it occurred to me also to copy in my own project the
>> RestSubmissionClient [8].
>>
>>
>> I'm between using launch and infer the appId or using Spark Submission
>> Rest API, but none of them seem a proper way to solve this. If someone
>> could give me an advise on how to face this I would appreciate it.
>>
>> Thanks in advance,
>>
>> Mariano
>>
>>
>> [1] https://spark.apache.org/docs/1.6.1/api/java/org/apache/
>> spark/launcher/package-summary.html
>> [2] https://github.com/spark-jobserver/spark-jobserver
>> [3] http://livy.io/
>> [4] http://stackoverflow.com/questions/28992802/triggering-s
>> park-jobs-with-rest (most voted answer)
>> [5] http://arturmkrtchyan.com/apache-spark-hidden-rest-api
>> [6] https://issues.apache.org/jira/browse/SPARK-5388
>> [7] https://github.com/ywilkof/spark-jobs-rest-client
>> [8] https://github.com/apache/spark/blob/master/core/src/mai
>> n/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
>> [9] http://spark.apache.org/docs/latest/monitoring.html
>>
>>
>>
>>
>
>
> --
> Regards,
> Ofer Eliassaf
>


Submit and Monitor standalone cluster application

2016-09-28 Thread Mariano Semelman
​Hello everybody,

I'm developing an application to submit batch and streaming apps in a fault
tolerant fashion. For that I need a programatically way to submit and
monitor my apps and relaunch them in case of failure. Right now I'm using
spark standalone (1.6.x) and submitting in cluster mode. The options I have
explored so far are:

SparkLauncher.java [1]: It has two modes:
- 1) launch() Doesn't give me the application-id in order to monitor
(with spark master rest API). Would have to infer from the application name and
startTime in api/v1/applications using the spark master API [9]
- 2) startApplication(...) Only works if submitted locally or client
mode (BTW, the fact that only works in client or local mode is not
documented in the package summary page[1] which led me to many, many wasted
hours)

Spark-Jobserver [2]:
Doesn't support standalone cluster mode

Livy [3]:
Doesn't support standalone cluster mode

Spark Submission Rest API [4,5,6]:
It seems the sensible way, but is black magic for the user. It's not
documented and there's no official Client. There's only one [7] unofficial
client. And it occurred to me also to copy in my own project the
RestSubmissionClient [8].


I'm between using launch and infer the appId or using Spark Submission Rest
API, but none of them seem a proper way to solve this. If someone could
give me an advise on how to face this I would appreciate it.

Thanks in advance,

Mariano


[1]
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/launcher/package-summary.html
[2] https://github.com/spark-jobserver/spark-jobserver
[3] http://livy.io/
[4]
http://stackoverflow.com/questions/28992802/triggering-spark-jobs-with-rest
(most voted answer)
[5] http://arturmkrtchyan.com/apache-spark-hidden-rest-api
[6] https://issues.apache.org/jira/browse/SPARK-5388
[7] https://github.com/ywilkof/spark-jobs-rest-client
[8]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
[9] http://spark.apache.org/docs/latest/monitoring.html


Re: SparkLauncher not receiving events

2016-09-26 Thread Mariano Semelman
Solved,
tl;dr
I was using port: 6066 instead of 7077,


I got confused because of this message in the log when I submit to the
legacy port:

[info] - org.apache.spark.launcher.app.ActivitiesSortingAggregateJob -
16/09/26 11:43:27 WARN RestSubmissionClient: Unable to connect to server
spark://ponyo:7077.
[info] - org.apache.spark.launcher.app.ActivitiesSortingAggregateJob -
Warning: Master endpoint spark://ponyo:7077 was not a REST server. Falling
back to legacy submission gateway instead.

That's why I started using the 6066 (rest), that was a some time ago.

But it seems that for some unknown reason to me sparkLauncher listeners
don't work with the rest port, absolutely all the logs (ie: client, master,
worker, driver) don't show anything at all.

Well, I hope I didn't make anyone spend too much time on this and saved
some other soul a few days with this.
​​
​Mariano


On 26 September 2016 at 11:37, Mariano Semelman <
mariano.semel...@despegar.com> wrote:

> Hello,
>
> I'm having problems to receive events from the submited app. The app
> succesfuly submits, but the listener I'm passing to SparkLauncher is not
> receiving events.
>
> Spark Version: 1.6.1 (both client app and master)
>
> here are the relevant snippets I'm using in my code:
> https://gist.github.com/msemelman/d9d2b54ce0c01af8952bc298058dc0d5
>
> BTW the logger is correctly configured if you ask for that. I even
> debugged the spark code and put some breakpoints
> in org.apache.spark.launcher.LauncherServer.ServerConnection#handle to
> check if any message were coming but nothing.
>
> Here it is the verbose output of the submission:
> https://gist.github.com/msemelman/2367fe4899886e2f2e38c87edfe4e9a9
> And what the master shows:
> https://www.dropbox.com/s/afmhnn3ra9wndca/Screenshot%20Master.png?dl=1
>
>
> Any help would be much appreciated, even if you told me where to debug,
> I'm a bit lost here.
>
> Thanks in advance
>


SparkLauncher not receiving events

2016-09-26 Thread Mariano Semelman
Hello,

I'm having problems to receive events from the submited app. The app
succesfuly submits, but the listener I'm passing to SparkLauncher is not
receiving events.

Spark Version: 1.6.1 (both client app and master)

here are the relevant snippets I'm using in my code:
https://gist.github.com/msemelman/d9d2b54ce0c01af8952bc298058dc0d5

BTW the logger is correctly configured if you ask for that. I even debugged
the spark code and put some breakpoints
in org.apache.spark.launcher.LauncherServer.ServerConnection#handle to
check if any message were coming but nothing.

Here it is the verbose output of the submission:
https://gist.github.com/msemelman/2367fe4899886e2f2e38c87edfe4e9a9
And what the master shows:
https://www.dropbox.com/s/afmhnn3ra9wndca/Screenshot%20Master.png?dl=1


Any help would be much appreciated, even if you told me where to debug, I'm
a bit lost here.

Thanks in advance


Re: Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Mariano Semelman
Thanks, I would go with log disabling.
BTW, the master crashed while the application was still running.

--

*Mariano Semelman*
P13N - IT
Av. Corrientes Nº 746 - piso 13 - C.A.B.A. (C1043AAU)
Teléfono (54) 11- *4894-3500*


[image: Seguinos en Twitter!] <http://twitter.com/#!/despegarar> [image:
Seguinos en Facebook!] <http://www.facebook.com/despegar> [image: Seguinos
en YouTube!] <http://www.youtube.com/Despegar>
*Despegar.com*
El mejor precio para tu viaje.

Este mensaje es confidencial y puede contener información amparada por el
secreto profesional. Si usted ha recibido este e-mail por error, por favor
comuníquenoslo inmediatamente respondiendo a este e-mail y luego
eliminándolo de su sistema. El contenido de este mensaje no deberá ser
copiado ni divulgado a ninguna persona.

On 13 September 2016 at 12:52, Bryan Cutler <cutl...@gmail.com> wrote:

> It looks like you have logging enabled and your application event log is
> too large for the master to build a web UI from it.  In spark 1.6.2 and
> earlier, when an application completes, the master rebuilds a web UI to
> view events after the fact.  This functionality was removed in spark 2.0
> and the history server should be used instead.  If you are unable to
> upgrade could you try disabling logging?
>
> On Sep 13, 2016 7:18 AM, "Mariano Semelman" <mariano.semel...@despegar.com>
> wrote:
>
>> Hello everybody,
>>
>> I am running a spark streaming app and I am planning to use it as a long
>> running service. However while trying the app in a rc environment I got
>> this exception in the master daemon after 1 hour of running:
>>
>> ​​Exception in thread "master-rebuild-ui-thread"
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at java.util.regex.Pattern.compile(Pattern.java:1667)
>> at java.util.regex.Pattern.(Pattern.java:1351)
>> at java.util.regex.Pattern.compile(Pattern.java:1054)
>> at java.lang.String.replace(String.java:2239)
>> at org.apache.spark.util.Utils$.getFormattedClassName(Utils.sca
>> la:1632)
>> at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonP
>> rotocol.scala:486)
>> at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayLi
>> stenerBus.scala:58)
>> at org.apache.spark.deploy.master.Master$$anonfun$17.apply(
>> Master.scala:972)
>> at org.apache.spark.deploy.master.Master$$anonfun$17.apply(
>> Master.scala:952)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> As a palliative measure I've increased the master memory to 1.5gb.
>> My job is running with a batch interval of 5 seconds.
>> I'm using spark version 1.6.2.
>>
>> I think it might be related to this issues:
>>
>> https://issues.apache.org/jira/browse/SPARK-6270
>> https://issues.apache.org/jira/browse/SPARK-12062
>> https://issues.apache.org/jira/browse/SPARK-12299
>>
>> But I don't see a clear road to solve this apart from upgrading spark.
>> What would you recommend?
>>
>>
>> Thanks in advance
>> Mariano
>>
>>


Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Mariano Semelman
Hello everybody,

I am running a spark streaming app and I am planning to use it as a long
running service. However while trying the app in a rc environment I got
this exception in the master daemon after 1 hour of running:

​​Exception in thread "master-rebuild-ui-thread"
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.regex.Pattern.compile(Pattern.java:1667)
at java.util.regex.Pattern.(Pattern.java:1351)
at java.util.regex.Pattern.compile(Pattern.java:1054)
at java.lang.String.replace(String.java:2239)
at
org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1632)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:486)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
at
org.apache.spark.deploy.master.Master$$anonfun$17.apply(Master.scala:972)
at
org.apache.spark.deploy.master.Master$$anonfun$17.apply(Master.scala:952)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


As a palliative measure I've increased the master memory to 1.5gb.
My job is running with a batch interval of 5 seconds.
I'm using spark version 1.6.2.

I think it might be related to this issues:

https://issues.apache.org/jira/browse/SPARK-6270
https://issues.apache.org/jira/browse/SPARK-12062
https://issues.apache.org/jira/browse/SPARK-12299

But I don't see a clear road to solve this apart from upgrading spark.
What would you recommend?


Thanks in advance
Mariano


Q: Multiple spark streaming app, one kafka topic, same consumer group

2016-09-06 Thread Mariano Semelman
Hello everybody,

I am trying to understand how Kafka Direct Stream works. I'm interested in
having a production ready Spark Streaming application that consumes a Kafka
topic. But I need to guarantee there's (almost) no downtime, specially
during deploys (and submit) of new versions. What it seems to be the best
solution is to deploy and submit the new version without shutting down the
previous one, wait for the new application to start consuming events and
then shutdown the previous one.

What I would expect is that the events get distributed among the two
applications in a balanced fashion using the consumer group id
​ splitted by the partition key that I've previously set on my Kafka
Producer.​ However I don't see that Kafka Direct stream support this
functionality.

I've achieved this with the Receiver-based approach (btw i've used "kafka"
for the "offsets.storage" kafka property[2]). However this approach come
with technical difficulties named in the Documentation[1] (ie: exactly-once
semantics).

​Anyway, not even this approach seems very failsafe, Does anyone know a way
to safely deploy new versions of a streaming application of this kind
without downtime?

​Thanks in advance

Mariano​
​


[1] http://spark.apache.org/docs/latest/streaming-kafka-integration.html
[2] http://kafka.apache.org/documentation.html#oldconsumerconfigs