Re: Windows operation orderBy desc

2016-08-24 Thread Selvam Raman
Hi all,

i am using window function to find out the latest record using row_number;

the hive table is partitioned.

when i run the function it runs for 545.

what is the reason for 545 task.

Thanks,
selvam R

On Mon, Aug 1, 2016 at 8:09 PM, Mich Talebzadeh 
wrote:

> You need to get the position right
>
>
> val wSpec = Window.partitionBy("col1").orderBy(desc("col2"))
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 August 2016 at 14:56, Ashok Kumar 
> wrote:
>
>> Hi,
>>
>> in the following Window spec I want orderBy ("") to be displayed
>> in descending order please
>>
>> val W = Window.partitionBy("col1").orderBy("col2")
>>
>> If I Do
>>
>> val W = Window.partitionBy("col1").orderBy("col2".desc)
>>
>> It throws error
>>
>> console>:26: error: value desc is not a member of String
>>
>> How can I achieve that?
>>
>> Thanking you
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: spark 2.0.0 - when saving a model to S3 spark creates temporary files. Why?

2016-08-24 Thread Tal Grynbaum
I read somewhere that its because s3 has to know the size of the file
upfront
I dont really understand this,  as to why is it ok  not to know it for the
temp files and not ok for the final files.
The delete permission is the minor disadvantage from my side,  the worst
thing is that i have a cluster of 100 machines sitting idle for 15 minutes
waiting for copy to end.

Any suggestions how to avoid that?

On Thu, Aug 25, 2016, 08:21 Aseem Bansal  wrote:

> Hi
>
> When Spark saves anything to S3 it creates temporary files. Why? Asking
> this as this requires the the access credentails to be given
> delete permissions along with write permissions.
>


Re: quick question

2016-08-24 Thread Sivakumaran S
You create a websocket object in your spark code and write your data to the 
socket. You create a websocket object in your dashboard code and receive the 
data in realtime and update the dashboard. You can use Node.js in your 
dashboard (socket.io). I am sure there are other ways too.

Does that help?

Sivakumaran S

> On 25-Aug-2016, at 6:30 AM, kant kodali  wrote:
> 
> so I would need to open a websocket connection from spark worker machine to 
> where?
> 
> 
> 
> 
> 
> On Wed, Aug 24, 2016 8:51 PM, Kevin Mellott kevin.r.mell...@gmail.com 
>  wrote:
> In the diagram you referenced, a real-time dashboard can be created using 
> WebSockets. This technology essentially allows your web page to keep an 
> active line of communication between the client and server, in which case you 
> can detect and display new information without requiring any user input of 
> page refreshes. The link below contains additional information on this 
> concept, as well as links to several different implementations (based on your 
> programming language preferences).
> 
> https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API 
> 
> 
> Hope this helps!
> - Kevin
> 
> On Wed, Aug 24, 2016 at 3:52 PM, kant kodali  > wrote:
> 
> -- Forwarded message --
> From: kant kodali >
> Date: Wed, Aug 24, 2016 at 1:49 PM
> Subject: quick question
> To: d...@spark.apache.org , 
> us...@spark.apache.org 
> 
> 
> 
> 
> In this picture what does "Dashboards" really mean? is there a open source 
> project which can allow me to push the results back to Dashboards such that 
> Dashboards are always in sync with real time updates? (a push based solution 
> is better than poll but i am open to whatever is possible given the above 
> picture)
> 



spark 2.0.0 - when saving a model to S3 spark creates temporary files. Why?

2016-08-24 Thread Aseem Bansal
Hi

When Spark saves anything to S3 it creates temporary files. Why? Asking
this as this requires the the access credentails to be given
delete permissions along with write permissions.


Re: Spark MLlib:Collaborative Filtering

2016-08-24 Thread Devi P.V
Thanks a lot.I solved the problem using string indexer.

On Wed, Aug 24, 2016 at 3:40 PM, Praveen Devarao 
wrote:

> You could use the string indexer to convert your string userids and
> product ids numeric value. http://spark.apache.org/docs/
> latest/ml-features.html#stringindexer
>
> Thanking You
> 
> -
> Praveen Devarao
> IBM India Software Labs
> 
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>
>
>
> From:glen 
> To:"Devi P.V" 
> Cc:"user@spark.apache.org" 
> Date:24/08/2016 02:10 pm
> Subject:Re: Spark MLlib:Collaborative Filtering
> --
>
>
>
> Hash it to int
>
>
>
> On 2016-08-24 16:28 , *Devi P.V*  Wrote:
>
> Hi all,
> I am newbie in collaborative filtering.I want to implement collaborative
> filtering algorithm(need to find top 10 recommended products) using Spark
> and Scala.I have a rating dataset where userID & ProductID are String type.
>
> UserID   ProductID Rating
> b3a68043-c1  p1-160ff5fDS-f74   1
> b3a68043-c2  p5-160ff5fDS-f74   1
> b3a68043-c0  p9-160ff5fDS-f74   1
>
>
> I tried ALS algorithm using spark MLlib.But it support rating userID &
> productID only Integer type.How can I solve this problem?
>
> Thanks In Advance
>
>
>
>
>


Re: Sqoop vs spark jdbc

2016-08-24 Thread ayan guha
Hi

Adding one more lense to it: If we are talking about one-off migration use
case, or weekly synch - sqoop would be a better choice. If we are talking
about regular data feeding from DB to Hadoop, and doing some transformation
in the pipe, spark will do better.

On Thu, Aug 25, 2016 at 2:08 PM, Ranadip Chatterjee 
wrote:

> This will depend on multiple factors. Assuming we are talking significant
> volumes of data, I'd prefer sqoop compared to spark on yarn, if ingestion
> performance is the sole consideration (which is true in many production use
> cases). Sqoop provides some potential optimisations specially around using
> native database batch extraction tools that spark cannot take advantage of.
> The performance inefficiency of using MR (actually map-only) is
> insignificant over a large corpus of data. Further, in a shared cluster, if
> the data volume is skewed for the given partition key, spark, without
> dynamic container allocation, can be significantly inefficient from cluster
> resources usage perspective. With dynamic allocation enabled, it is less so
> but sqoop still has a slight edge due to the time Spark holds on to the
> resources before giving them up.
>
> If ingestion is part of a more complex DAG that relies on Spark cache (rdd
> / dataframe or dataset), then using Spark jdbc can have a significant
> advantage in being able to cache the data without persisting into hdfs
> first. But whether this will convert into an overall significantly better
> performance of the DAG or cluster will depend on the DAG stages and their
> performance. In general, if the ingestion stage is the significant
> bottleneck in the DAG, then the advantage will be significant.
>
> Hope this provides a general direction to consider in your case.
>
> On 25 Aug 2016 3:09 a.m., "Venkata Penikalapati" <
> mail.venkatakart...@gmail.com> wrote:
>
>> Team,
>> Please help me in choosing sqoop or spark jdbc to fetch data from rdbms.
>> Sqoop has lot of optimizations to fetch data does spark jdbc also has those
>> ?
>>
>> I'm performing few analytics using spark data for which data is residing
>> in rdbms.
>>
>> Please guide me with this.
>>
>>
>> Thanks
>> Venkata Karthik P
>>
>>


-- 
Best Regards,
Ayan Guha


Spark Logging : log4j.properties or log4j.xml

2016-08-24 Thread John Jacobs
One can specify "-Dlog4j.configuration=" or
"-Dlog4j.configuration=".
Is there any preference to using one over other?

All the spark documentation talks about using "log4j.properties" only (
http://spark.apache.org/docs/latest/configuration.html#configuring-logging).
So is only "log4j.properties" officially supported?


Re: Sqoop vs spark jdbc

2016-08-24 Thread Ranadip Chatterjee
This will depend on multiple factors. Assuming we are talking significant
volumes of data, I'd prefer sqoop compared to spark on yarn, if ingestion
performance is the sole consideration (which is true in many production use
cases). Sqoop provides some potential optimisations specially around using
native database batch extraction tools that spark cannot take advantage of.
The performance inefficiency of using MR (actually map-only) is
insignificant over a large corpus of data. Further, in a shared cluster, if
the data volume is skewed for the given partition key, spark, without
dynamic container allocation, can be significantly inefficient from cluster
resources usage perspective. With dynamic allocation enabled, it is less so
but sqoop still has a slight edge due to the time Spark holds on to the
resources before giving them up.

If ingestion is part of a more complex DAG that relies on Spark cache (rdd
/ dataframe or dataset), then using Spark jdbc can have a significant
advantage in being able to cache the data without persisting into hdfs
first. But whether this will convert into an overall significantly better
performance of the DAG or cluster will depend on the DAG stages and their
performance. In general, if the ingestion stage is the significant
bottleneck in the DAG, then the advantage will be significant.

Hope this provides a general direction to consider in your case.

On 25 Aug 2016 3:09 a.m., "Venkata Penikalapati" <
mail.venkatakart...@gmail.com> wrote:

> Team,
> Please help me in choosing sqoop or spark jdbc to fetch data from rdbms.
> Sqoop has lot of optimizations to fetch data does spark jdbc also has those
> ?
>
> I'm performing few analytics using spark data for which data is residing
> in rdbms.
>
> Please guide me with this.
>
>
> Thanks
> Venkata Karthik P
>
>


Spark Streaming user function exceptions causing hangs

2016-08-24 Thread N B
Hello,

We have a Spark streaming application (running Spark 1.6.1) that consumes
data from a message queue. The application is running in local[*] mode so
driver and executors are in a single JVM.

The issue that we are dealing with these days is that if any of our lambda
functions throw any Exception, the entire processing hangs and the task
keeps getting retried ad infinitum. According to the documentation here :

http://spark.apache.org/docs/1.6.1/configuration.html

the default value of spark.task.maxFailures is set to 4 and we have not
changed that so I don't quite get why the tasks keep getting retried
forever.

How can we make sure that a single bad record causing a Runtime Exception
does not stop all the processing?

Till now I have tried to add try/catch blocks around the lambda functions
and returning null in case an exception does get thrown but that causes
NullPointerExceptions within Spark with the same net effect of processing
of the subsequent batches getting stopped.

Thanks in advance,
NB


Re: Incremental Updates and custom SQL via JDBC

2016-08-24 Thread Sascha Schmied
Thank you for your answer.

I’m using ORC transactional table right now. But i’m not stuck with that. When 
I send an SQL statement like the following, where old_5sek_agg and new_5sek_agg 
are registered temp tables, I’ll get an exception in spark. Same without 
subselect.

sqlContext.sql("DELETE FROM old_5sek_agg WHERE Sec in (SELECT Sec FROM 
new_5sek_agg)")

When I execute the statement directly in hive ambari view, I don’t get 
exceptions, indeed I get a success info, but the pointed row won’t be deleted 
or updated by UPDATE statement.

I’m not familiar with your op_type and op_time approach and couldn’t find any 
useful resources by quickly asking google, but it sounds promising. 
Unfortunately your answer seems to be cut off in the middle of your example. 
Would you really update the value of those two additional columns and if so, 
how would you do this when it’s not a ORC transactional table.

Thanks again!

Am 25.08.2016 um 01:24 schrieb Mich Talebzadeh 
>:

Dr Mich Talebzadeh



Re: Incremental Updates and custom SQL via JDBC

2016-08-24 Thread Mich Talebzadeh
So you want to push data from Spark streaming to both Hive and SAP HANA
tables.

Let us take one at a time.

Spark writing to Hive table but you need to delete some rows from Hive
beforehand?

Have you defined your ORC table as ORC transactional or you are just
marking them as deleted with two additional columns op_type , op_time ,
keeping data immutable?

Example op_type = 3 and op_time = cast(from_unixtime(unix_timestamp()) AS
timestamp) for deleted records and when

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 August 2016 at 00:08, Oldskoola  wrote:

> Hi,
>
> I'm building aggregates over Streaming Data. When new data effects
> previously processed aggregates, I'll need to update the effected rows or
> delete them before writing the new processed aggregates back to HDFS (Hive
> Metastore) and a SAP HANA Table. How would you do this, when writing a
> complete dataframe every Interval is not an option?
>
> Somewhat related is the question for custom JDBC SQL for writing to the SAP
> HANA DB. How would you implement SAP HANA specific commands if the build in
> JDBC df writer is not sufficient for your needs. In this case I primarily
> want to to do the incremental updates as described before and maybe also
> want to send specific CREATE TABLE syntax for columnar store and time
> table.
>
> Thank you very much in advance. I'm a little stuck on this one.
>
> Regards
> Sascha
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Incremental-Updates-and-custom-SQL-via-JDBC-tp27598.
> html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Incremental Updates and custom SQL via JDBC

2016-08-24 Thread Oldskoola
Hi,

I'm building aggregates over Streaming Data. When new data effects
previously processed aggregates, I'll need to update the effected rows or
delete them before writing the new processed aggregates back to HDFS (Hive
Metastore) and a SAP HANA Table. How would you do this, when writing a
complete dataframe every Interval is not an option?

Somewhat related is the question for custom JDBC SQL for writing to the SAP
HANA DB. How would you implement SAP HANA specific commands if the build in
JDBC df writer is not sufficient for your needs. In this case I primarily
want to to do the incremental updates as described before and maybe also
want to send specific CREATE TABLE syntax for columnar store and time table. 

Thank you very much in advance. I'm a little stuck on this one. 

Regards
Sascha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incremental-Updates-and-custom-SQL-via-JDBC-tp27598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Sqoop vs spark jdbc

2016-08-24 Thread Mich Talebzadeh
Personally I prefer Spark JDBC.

Both Sqoop and Spark rely on the same drivers.

I think Spark is faster and if you have many nodes you can partition your
incoming data and take advantage of Spark DAG + in memory offering.

By default Sqoop will use Map-reduce which is pretty slow.

Remember for Spark you will need to have sufficient memory

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 August 2016 at 22:39, Venkata Penikalapati <
mail.venkatakart...@gmail.com> wrote:

> Team,
> Please help me in choosing sqoop or spark jdbc to fetch data from rdbms.
> Sqoop has lot of optimizations to fetch data does spark jdbc also has those
> ?
>
> I'm performing few analytics using spark data for which data is residing
> in rdbms.
>
> Please guide me with this.
>
>
> Thanks
> Venkata Karthik P
>
>


How to compute a net (difference) given a bi-directional stream of numbers using spark streaming?

2016-08-24 Thread kant kodali

Hi Guys,
I am new to spark but I am wondering how do I compute the difference given a
bidirectional stream of numbers using spark streaming? To put it more concrete
say Bank A is sending money to Bank B and Bank B is sending money to Bank A
throughout the day such that at any given time we want to compute the difference
to find out who owe's who and how much?
The Algorithm can simply be breaking the stream into small intervals (which
spark streaming looks like it does) and then computing the difference in each
time interval and summing the results of each time interval (assuming the
numbers in one interval doesn't overlap with other intervals).
I would like to know what is the best way to achieve this using spark streaming?
I am also open to any other suggestions in the Algorithm.
Thanks!

Sqoop vs spark jdbc

2016-08-24 Thread Venkata Penikalapati
Team, Please help me in choosing sqoop or spark jdbc to fetch data from rdbms. 
Sqoop has lot of optimizations to fetch data does spark jdbc also has those ?
I'm performing few analytics using spark data for which data is residing in 
rdbms. 
Please guide me with this. 

ThanksVenkata Karthik P 


Re: Best way to calculate intermediate column statistics

2016-08-24 Thread Mich Talebzadeh
Hi Richard,

can you use analytics functions for this purpose on DF

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 August 2016 at 21:37, Richard Siebeling  wrote:

> Hi Mich,
>
> I'd like to gather several statistics per column in order to make
> analysing data easier. These two statistics are some examples, other
> statistics I'd like to gather are the variance, the median, several
> percentiles, etc.  We are building a data analysis platform based on Spark,
>
> kind regards,
> Richard
>
> On Wed, Aug 24, 2016 at 6:52 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Richard,
>>
>> What is the business use case for such statistics?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 August 2016 at 16:01, Bedrytski Aliaksandr 
>> wrote:
>>
>>> Hi Richard,
>>>
>>> these intermediate statistics should be calculated from the result of
>>> the calculation or during the aggregation?
>>> If they can be derived from the resulting dataframe, why not to cache
>>> (persist) that result just after the calculation?
>>> Then you may aggregate statistics from the cached dataframe.
>>> This way it won't hit performance too much.
>>>
>>> Regards
>>> --
>>>   Bedrytski Aliaksandr
>>>   sp...@bedryt.ski
>>>
>>>
>>>
>>> On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
>>>
>>> Hi,
>>>
>>> what is the best way to calculate intermediate column statistics like
>>> the number of empty values and the number of distinct values each column in
>>> a dataset when aggregating of filtering data next to the actual result of
>>> the aggregate or the filtered data?
>>>
>>> We are developing an application in which the user can slice-and-dice
>>> through the data and we would like to, next to the actual resulting data,
>>> get column statistics of each column in the resulting dataset. We prefer to
>>> calculate the column statistics on the same pass over the data as the
>>> actual aggregation or filtering, is that possible?
>>>
>>> We could sacrifice a little bit of performance (but not too much),
>>> that's why we prefer one pass...
>>>
>>> Is this possible in the standard Spark or would this mean modifying the
>>> source a little bit and recompiling? Is that feasible / wise to do?
>>>
>>> thanks in advance,
>>> Richard
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-24 Thread kant kodali

What do I loose if I run spark without using HDFS or Zookeper ? which of them is
almost a must in practice?

Re: Best way to calculate intermediate column statistics

2016-08-24 Thread Richard Siebeling
Hi Mich,

I'd like to gather several statistics per column in order to make analysing
data easier. These two statistics are some examples, other statistics I'd
like to gather are the variance, the median, several percentiles, etc.  We
are building a data analysis platform based on Spark,

kind regards,
Richard

On Wed, Aug 24, 2016 at 6:52 PM, Mich Talebzadeh 
wrote:

> Hi Richard,
>
> What is the business use case for such statistics?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 August 2016 at 16:01, Bedrytski Aliaksandr  wrote:
>
>> Hi Richard,
>>
>> these intermediate statistics should be calculated from the result of the
>> calculation or during the aggregation?
>> If they can be derived from the resulting dataframe, why not to cache
>> (persist) that result just after the calculation?
>> Then you may aggregate statistics from the cached dataframe.
>> This way it won't hit performance too much.
>>
>> Regards
>> --
>>   Bedrytski Aliaksandr
>>   sp...@bedryt.ski
>>
>>
>>
>> On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
>>
>> Hi,
>>
>> what is the best way to calculate intermediate column statistics like the
>> number of empty values and the number of distinct values each column in a
>> dataset when aggregating of filtering data next to the actual result of the
>> aggregate or the filtered data?
>>
>> We are developing an application in which the user can slice-and-dice
>> through the data and we would like to, next to the actual resulting data,
>> get column statistics of each column in the resulting dataset. We prefer to
>> calculate the column statistics on the same pass over the data as the
>> actual aggregation or filtering, is that possible?
>>
>> We could sacrifice a little bit of performance (but not too much), that's
>> why we prefer one pass...
>>
>> Is this possible in the standard Spark or would this mean modifying the
>> source a little bit and recompiling? Is that feasible / wise to do?
>>
>> thanks in advance,
>> Richard
>>
>>
>>
>>
>>
>>
>


Re: Maelstrom: Kafka integration with Spark

2016-08-24 Thread Jeoffrey Lim
To clarify my earlier statement, I will continue working on Maelstrom
as an alternative to official Spark integration with Kafka and keep
the KafkaRDDs + Consumers as it is - until I find the official Spark Kafka
more stable and resilient to Kafka broker issues/failures (reason I have
infinite retry strategy on numerous places around Kafka related routines).

Not that i'm complaining or competing, at the end of the day having
a Spark App that continues to work overnight gives developer a good
sleep at night :)

On Thu, Aug 25, 2016 at 3:23 AM, Jeoffrey Lim  wrote:

> Hi Cody, thank you for pointing out sub-millisecond processing, it is
> an "exaggerated" term :D I simply got excited releasing this project, it
> should be: "millisecond stream processing at the spark level".
>
> Highly appreciate the info about latest Kafka consumer. Would need
> to get up to speed about the most recent improvements and new features
> of Kafka itself.
>
> I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's
> upside would only be the simple APIs (developer friendly). I'll play
> around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible.
>
>
> On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger 
> wrote:
>
>> Yes, spark-streaming-kafka-0-10 uses the new consumer.   Besides
>> pre-fetching messages, the big reason for that is that security
>> features are only available with the new consumer.
>>
>> The Kafka project is at release 0.10.0.1 now, they think most of the
>> issues with the new consumer have been ironed out.  You can track the
>> progress as to when they'll remove the "beta" label at
>> https://issues.apache.org/jira/browse/KAFKA-3283
>>
>> As far as I know, Kafka in general can't achieve sub-millisecond
>> end-to-end stream processing, so my guess is you need to be more
>> specific about your terms there.
>>
>> I promise I'm not trying to start a pissing contest :)  just wanted to
>> check if you were aware of the current state of the other consumers.
>> Collaboration is always welcome.
>>
>>
>> On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim 
>> wrote:
>> > Apologies, I was not aware that Spark 2.0 has Kafka Consumer
>> caching/pooling
>> > now.
>> > What I have checked is the latest Kafka Consumer, and I believe it is
>> still
>> > in beta quality.
>> >
>> > https://kafka.apache.org/documentation.html#newconsumerconfigs
>> >
>> >> Since 0.9.0.0 we have been working on a replacement for our existing
>> >> simple and high-level consumers.
>> >> The code is considered beta quality.
>> >
>> > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
>> this
>> > one? Is it now stable?
>> > With this caching feature in Spark 2,.0 could it achieve
>> sub-milliseconds
>> > stream processing now?
>> >
>> >
>> > Maelstrom still uses the old Kafka Simple Consumer, this library was
>> made
>> > open source so that I
>> > could continue working on it for future updates & improvements like
>> when the
>> > latest Kafka Consumer
>> > gets a stable release.
>> >
>> > We have been using Maelstrom "caching concept" for a long time now, as
>> > Receiver based Spark Kafka integration
>> > does not work for us. There were thoughts about using Direct Kafka APIs,
>> > however Maelstrom has
>> > very simple APIs and just "simply works" even under unstable scenarios
>> (e.g.
>> > advertised hostname failures on EMR).
>> >
>> > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and
>> of
>> > course with the latest Kafka 0.10 as well)
>> >
>> >
>> > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger 
>> wrote:
>> >>
>> >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
>> >> kafka consumer instances on the executors?
>> >>
>> >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim 
>> wrote:
>> >> > Hi,
>> >> >
>> >> > I have released the first version of a new Kafka integration with
>> Spark
>> >> > that we use in the company I work for: open sourced and named
>> Maelstrom.
>> >> >
>> >> > It is unique compared to other solutions out there as it reuses the
>> >> > Kafka Consumer connection to achieve sub-milliseconds latency.
>> >> >
>> >> > This library has been running stable in production environment and
>> has
>> >> > been proven to be resilient to numerous production issues.
>> >> >
>> >> >
>> >> > Please check out the project's page in github:
>> >> >
>> >> > https://github.com/jeoffreylim/maelstrom
>> >> >
>> >> >
>> >> > Contributors welcome!
>> >> >
>> >> >
>> >> > Cheers!
>> >> >
>> >> > Jeoffrey Lim
>> >> >
>> >> >
>> >> > P.S. I am also looking for a job opportunity, please look me up at
>> >> > Linked In
>> >
>> >
>>
>
>


Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-24 Thread Arun Luthra
Also for the record, turning on kryo was not able to help.

On Tue, Aug 23, 2016 at 12:58 PM, Arun Luthra  wrote:

> Splitting up the Maps to separate objects did not help.
>
> However, I was able to work around the problem by reimplementing it with
> RDD joins.
>
> On Aug 18, 2016 5:16 PM, "Arun Luthra"  wrote:
>
>> This might be caused by a few large Map objects that Spark is trying to
>> serialize. These are not broadcast variables or anything, they're just
>> regular objects.
>>
>> Would it help if I further indexed these maps into a two-level Map i.e.
>> Map[String, Map[String, Int]] ? Or would this still count against me?
>>
>> What if I manually split them up into numerous Map variables?
>>
>> On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra 
>> wrote:
>>
>>> I got this OOM error in Spark local mode. The error seems to have been
>>> at the start of a stage (all of the stages on the UI showed as complete,
>>> there were more stages to do but had not showed up on the UI yet).
>>>
>>> There appears to be ~100G of free memory at the time of the error.
>>>
>>> Spark 2.0.0
>>> 200G driver memory
>>> local[30]
>>> 8 /mntX/tmp directories for spark.local.dir
>>> "spark.sql.shuffle.partitions", "500"
>>> "spark.driver.maxResultSize","500"
>>> "spark.default.parallelism", "1000"
>>>
>>> The line number for the error is at an RDD map operation where there are
>>> some potentially large Map objects that are going to be accessed by each
>>> record. Does it matter if they are broadcast variables or not? I imagine
>>> not because its in local mode they should be available in memory to every
>>> executor/core.
>>>
>>> Possibly related:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl
>>> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError
>>> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt
>>> ream.java:123)
>>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput
>>> Stream.java:93)
>>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec
>>> tOutputStream.java:1877)
>>> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat
>>> aMode(ObjectOutputStream.java:1786)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at org.apache.spark.serializer.JavaSerializationStream.writeObj
>>> ect(JavaSerializer.scala:43)
>>> at org.apache.spark.serializer.JavaSerializerInstance.serialize
>>> (JavaSerializer.scala:100)
>>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo
>>> sureCleaner.scala:295)
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
>>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>>> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
>>> at abc.Abc$.main(abc.scala:395)
>>> at abc.Abc.main(abc.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>>> $SparkSubmit$$runMain(SparkSubmit.scala:729)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>>> .scala:185)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>


Re: Maelstrom: Kafka integration with Spark

2016-08-24 Thread Jeoffrey Lim
Hi Cody, thank you for pointing out sub-millisecond processing, it is
an "exaggerated" term :D I simply got excited releasing this project, it
should be: "millisecond stream processing at the spark level".

Highly appreciate the info about latest Kafka consumer. Would need
to get up to speed about the most recent improvements and new features
of Kafka itself.

I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's
upside would only be the simple APIs (developer friendly). I'll play
around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible.


On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger  wrote:

> Yes, spark-streaming-kafka-0-10 uses the new consumer.   Besides
> pre-fetching messages, the big reason for that is that security
> features are only available with the new consumer.
>
> The Kafka project is at release 0.10.0.1 now, they think most of the
> issues with the new consumer have been ironed out.  You can track the
> progress as to when they'll remove the "beta" label at
> https://issues.apache.org/jira/browse/KAFKA-3283
>
> As far as I know, Kafka in general can't achieve sub-millisecond
> end-to-end stream processing, so my guess is you need to be more
> specific about your terms there.
>
> I promise I'm not trying to start a pissing contest :)  just wanted to
> check if you were aware of the current state of the other consumers.
> Collaboration is always welcome.
>
>
> On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim 
> wrote:
> > Apologies, I was not aware that Spark 2.0 has Kafka Consumer
> caching/pooling
> > now.
> > What I have checked is the latest Kafka Consumer, and I believe it is
> still
> > in beta quality.
> >
> > https://kafka.apache.org/documentation.html#newconsumerconfigs
> >
> >> Since 0.9.0.0 we have been working on a replacement for our existing
> >> simple and high-level consumers.
> >> The code is considered beta quality.
> >
> > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
> this
> > one? Is it now stable?
> > With this caching feature in Spark 2,.0 could it achieve sub-milliseconds
> > stream processing now?
> >
> >
> > Maelstrom still uses the old Kafka Simple Consumer, this library was made
> > open source so that I
> > could continue working on it for future updates & improvements like when
> the
> > latest Kafka Consumer
> > gets a stable release.
> >
> > We have been using Maelstrom "caching concept" for a long time now, as
> > Receiver based Spark Kafka integration
> > does not work for us. There were thoughts about using Direct Kafka APIs,
> > however Maelstrom has
> > very simple APIs and just "simply works" even under unstable scenarios
> (e.g.
> > advertised hostname failures on EMR).
> >
> > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and
> of
> > course with the latest Kafka 0.10 as well)
> >
> >
> > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger 
> wrote:
> >>
> >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
> >> kafka consumer instances on the executors?
> >>
> >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim 
> wrote:
> >> > Hi,
> >> >
> >> > I have released the first version of a new Kafka integration with
> Spark
> >> > that we use in the company I work for: open sourced and named
> Maelstrom.
> >> >
> >> > It is unique compared to other solutions out there as it reuses the
> >> > Kafka Consumer connection to achieve sub-milliseconds latency.
> >> >
> >> > This library has been running stable in production environment and has
> >> > been proven to be resilient to numerous production issues.
> >> >
> >> >
> >> > Please check out the project's page in github:
> >> >
> >> > https://github.com/jeoffreylim/maelstrom
> >> >
> >> >
> >> > Contributors welcome!
> >> >
> >> >
> >> > Cheers!
> >> >
> >> > Jeoffrey Lim
> >> >
> >> >
> >> > P.S. I am also looking for a job opportunity, please look me up at
> >> > Linked In
> >
> >
>


Re: Is "spark streaming" streaming or mini-batch?

2016-08-24 Thread Mich Talebzadeh
Is "spark streaming" streaming or mini-batch?

I look at something Like Complex Event Processing (CEP) which is a leader
use case for data streaming (and I am experimenting with Spark for it) and
in the realm of CEP there is really no such thing as continuous data
streaming. The point is that when the source sends data out, it is never
truly continuous. What is happening is that "discrete digital messages" are
sent out.  This is in contrast to FM radio Signal or sinusoidal waves that
are continuous analog signals.  However, in the world of CEP, the digital
data which will always be sent as bytes and typically with bytes grouped
into messages as an Event Driven signal.

For certain streaming, the use of Spark is perfectly OK (discarding Flink
and other stuff around).

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 August 2016 at 10:40, Steve Loughran  wrote:

>
> On 23 Aug 2016, at 17:58, Mich Talebzadeh 
> wrote:
>
> In general depending what you are doing you can tighten above parameters.
> For example if you are using Spark Streaming for Anti-fraud detection, you
> may stream data in at 2 seconds batch interval, Keep your windows length at
> 4 seconds and your sliding intervall = 2 seconds which gives you a kind of
> tight streaming. You are aggregating data that you are collecting over the
> batch Window.
>
>
> I should warn that in https://github.com/apache/spark/pull/14731 I've
> been trying to speed up input scanning against object stores, and
> collecting numbers on the way
>
> *if you are using the FileInputDStream to scan s3, azure (and persumably
> gcs) object stores for data, the time to scan a moderately complex
> directory tree is going to be measurable in seconds*
>
> It's going to depend on distance from the object store and number of
> files, but you'll probably need to use a bigger window
>
> (that patch for SPARK-17159 should improve things ... I'd love some people
> to help by testing it or emailing me direct with any (anonymised) list of
> what their directory structures used in object store FileInputDStream
> streams that I could regenerate for inclusion in some performance tests.
>
>
>


Re: a question about LBFGS in Spark

2016-08-24 Thread DB Tsai
Hi Lingling,

I think you don't properly subscribe to mailing list yet, so I +cc to
the mailing list.

The mllib package is deprecated, and we no longer maintain it. The
reason why it designed in this way is because of backward
compatibility. In the original design, updater also has the logic of
step size, and in LBFGS, we don't use it. In the code, we have
documentation the math, and why this works.

/**
* It will return the gradient part of regularization using updater.
*
* Given the input parameters, the updater basically does the following,
*
* w' = w - thisIterStepSize * (gradient + regGradient(w))
* Note that regGradient is function of w
*
* If we set gradient = 0, thisIterStepSize = 1, then
*
* regGradient(w) = w - w'
*
* TODO: We need to clean it up by separating the logic of regularization out
* from updater to regularizer.
*/
// The following gradientTotal is actually the regularization part of gradient.
// Will add the gradientSum computed from the data with weights in the
next step.


Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

>> On Wed, Aug 24, 2016 at 7:16 AM Lingling Li  wrote:
>>>
>>> Hi!
>>>
>>> Sorry for getting in touch. This is Ling Ling and I am now reading the
>>> LBFGS code in Spark.
>>>
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
>>>
>>> I find that you are one of the contributors, so may be you can help me
>>> out here? I appreciate it!
>>>
>>> In the CostFun:
>>> val regVal = updater.compute(w, Vectors.zeros(n), 0, 1, regParam)._2
>>> axpy(-1.0, updater.compute(w, Vectors.zeros(n), 1, 1, regParam)._1,
>>> gradientTotal)
>>>
>>> Why is the gradient in the updater being set as 0?And why the stepsize is
>>> 0 and 1 respectively?
>>>
>>> Thank you very much for your help!
>>>
>>> All the best,
>>> Ling Ling
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-jdbc impala with kerberos using yarn-client

2016-08-24 Thread Marcelo Vanzin
I believe the Impala JDBC driver is mostly the same as the Hive
driver, but I could be wrong. In any case, the right place to ask that
question is the Impala groups (see http://impala.apache.org/).

On a side note, it is a little odd that you're trying to read data
from Impala using JDBC, instead of just telling Spark to read it
directly with its Hive support...


On Tue, Aug 23, 2016 at 2:12 PM, twisterius  wrote:
> I am trying to use the spark-jdbc package to access an impala table via a
> spark data frame. From my understanding
> (https://issues.apache.org/jira/browse/SPARK-12312) When loading DataFrames
> from JDBC datasource with Kerberos authentication, remote executors
> (yarn-client/cluster etc. modes) fail to establish a connection due to lack
> of Kerberos ticket or ability to generate it. I found a solution to this
> issue by creating an jdbc driver which properly handles kerberos
> authenticatation
> (https://datamountaineer.com/2016/01/15/spark-jdbc-sql-server-kerberos/).
> However I cannot find the source the impala jdbc driver online. Should I
> just use a hive driver to enable kerberos authentication for impala, or is
> there a location where I can find the impala jdbc driver source. Also is the
> ticket listed above SPARK-12312 accurate, or is there an out of the box way
> for me to connect to a kerberized impala using
> sqlContext.load("jdbc",options) without having to rewrite the impala driver?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-jdbc-impala-with-kerberos-using-yarn-client-tp27589.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0 with Kafka 0.10 exception

2016-08-24 Thread Srikanth
Thanks Cody. Setting poll timeout helped.
Our network is fine but brokers are not fully provisioned in test cluster.
But there isn't enough load to max out on broker capacity.
Curious that kafkacat running on the same node doesn't have any issues.

Srikanth

On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger  wrote:

> You can set that poll timeout higher with
>
> spark.streaming.kafka.consumer.poll.ms
>
> but half a second is fairly generous.  I'd try to take a look at
> what's going on with your network or kafka broker during that time.
>
> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth  wrote:
> > Hello,
> >
> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
> >
> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> >> spark-executor-example mt_event 0 15782114
> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> spark-executor-example.
> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
> (TID
> >> 6)
> >> java.lang.AssertionError: assertion failed: Failed to get records for
> >> spark-executor-example mt_event 0 15782114 after polling for 512
> >> at scala.Predef$.assert(Predef.scala:170)
> >> at
> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> at
> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> at
> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >
> >
> > I get this error intermittently. Sometimes a few batches are scheduled
> and
> > run fine. Then I get this error.
> > kafkacat is able to fetch from this topic continuously.
> >
> > Full exception is here --
> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
> >
> > Srikanth
>


RE: spark.lapply in SparkR: Error in writeBin(batch, con, endian = "big")

2016-08-24 Thread Cinquegrana, Piero
Hi Spark experts,

I was able to get around the broadcast issue by using a global assignment '<<-' 
instead of reading the data locally. However, I still get the following error:

Error in writeBin(batch, con, endian = "big") :
  attempting to add too many elements to raw vector


Pseudo code:

scoreModel <- function(parameters){
   library(score)
   score(dat,parameters)
}

dat <<- read.csv('file.csv')
modelScores <- spark.lapply(parameterList, scoreModel)

From: Cinquegrana, Piero [mailto:piero.cinquegr...@neustar.biz]
Sent: Tuesday, August 23, 2016 2:39 PM
To: Felix Cheung ; user@spark.apache.org
Subject: RE: spark.lapply in SparkR: Error in writeBin(batch, con, endian = 
"big")

The output from score() is very small, just a float. The input, however, could 
be as big as several hundred MBs. I would like to broadcast the dataset to all 
executors.

Thanks,
Piero

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Monday, August 22, 2016 10:48 PM
To: Cinquegrana, Piero 
>; 
user@spark.apache.org
Subject: Re: spark.lapply in SparkR: Error in writeBin(batch, con, endian = 
"big")

How big is the output from score()?

Also could you elaborate on what you want to broadcast?


On Mon, Aug 22, 2016 at 11:58 AM -0700, "Cinquegrana, Piero" 
> wrote:
Hello,

I am using the new R API in SparkR spark.lapply (spark 2.0). I am defining a 
complex function to be run across executors and I have to send the entire 
dataset, but there is not (that I could find) a way to broadcast the variable 
in SparkR. I am thus reading the dataset in each executor from disk, but I 
getting the following error:

Error in writeBin(batch, con, endian = "big") :
  attempting to add too many elements to raw vector

Any idea why this is happening?

Pseudo code:

scoreModel <- function(parameters){
   library(read.table)
   dat <- data.frame(fread("file.csv"))
   score(dat,parameters)
}

parameterList <- lapply(1:numModels, function(i) getParameters(i))

modelScores <- spark.lapply(parameterList, scoreModel)


Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook
   [New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  
Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook
   [New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  

Re: GraphFrames 0.2.0 released

2016-08-24 Thread Maciej Bryński
Hi,
Do you plan to add tag for this release on github ?
https://github.com/graphframes/graphframes/releases

Regards,
Maciek

2016-08-17 3:18 GMT+02:00 Jacek Laskowski :

> Hi Tim,
>
> AWESOME. Thanks a lot for releasing it. That makes me even more eager
> to see it in Spark's codebase (and replacing the current RDD-based
> API)!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Aug 16, 2016 at 9:32 AM, Tim Hunter 
> wrote:
> > Hello all,
> > I have released version 0.2.0 of the GraphFrames package. Apart from a
> few
> > bug fixes, it is the first release published for Spark 2.0 and both scala
> > 2.10 and 2.11. Please let us know if you have any comment or questions.
> >
> > It is available as a Spark package:
> > https://spark-packages.org/package/graphframes/graphframes
> >
> > The source code is available as always at
> > https://github.com/graphframes/graphframes
> >
> >
> > What is GraphFrames?
> >
> > GraphFrames is a DataFrame-based graph engine Spark. In addition to the
> > algorithms available in GraphX, users can write highly expressive
> queries by
> > leveraging the DataFrame API, combined with a new API for motif finding.
> The
> > user also benefits from DataFrame performance optimizations within the
> Spark
> > SQL engine.
> >
> > Cheers
> >
> > Tim
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Maciek Bryński


Re: Best way to calculate intermediate column statistics

2016-08-24 Thread Mich Talebzadeh
Hi Richard,

What is the business use case for such statistics?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 August 2016 at 16:01, Bedrytski Aliaksandr  wrote:

> Hi Richard,
>
> these intermediate statistics should be calculated from the result of the
> calculation or during the aggregation?
> If they can be derived from the resulting dataframe, why not to cache
> (persist) that result just after the calculation?
> Then you may aggregate statistics from the cached dataframe.
> This way it won't hit performance too much.
>
> Regards
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
>
> Hi,
>
> what is the best way to calculate intermediate column statistics like the
> number of empty values and the number of distinct values each column in a
> dataset when aggregating of filtering data next to the actual result of the
> aggregate or the filtered data?
>
> We are developing an application in which the user can slice-and-dice
> through the data and we would like to, next to the actual resulting data,
> get column statistics of each column in the resulting dataset. We prefer to
> calculate the column statistics on the same pass over the data as the
> actual aggregation or filtering, is that possible?
>
> We could sacrifice a little bit of performance (but not too much), that's
> why we prefer one pass...
>
> Is this possible in the standard Spark or would this mean modifying the
> source a little bit and recompiling? Is that feasible / wise to do?
>
> thanks in advance,
> Richard
>
>
>
>
>
>


Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
Before 2.0, Spark has built-in support for caching RDD data on 
Tachyon(Alluxio), but that support is removed since 2.0. In either case, Spark 
does not support writing shuffle data to Tachyon.

Since Alluxio has experimental support for FUSE 
(http://www.alluxio.org/docs/master/en/Mounting-Alluxio-FS-with-FUSE.html 
), 
you can try it and set spark.local.dir to point to the directory of Alluxio 
FUSE.

There is also on-going effort trying to take advantage of SSD to improve 
shuffle performance, see https://issues.apache.org/jira/browse/SPARK-12196 
. The PR is ready, but not 
get merged. You may give it a try by yourself.

> On Aug 24, 2016, at 22:30, tony@tendcloud.com wrote:
> 
> Hi, Saisai and Rui,
> Thanks a lot for your answer.  Alluxio tried to work as the middle layer 
> between storage and Spark, so is it possible to use Alluxio to resolve the 
> issue? We want to have 1 SSD for every datanode and use Alluxio to manage 
> mem,ssd and hdd. 
> 
> Thanks and Regards,
> Tony
> 
> tony@tendcloud.com 
>  
> From: Sun Rui 
> Date: 2016-08-24 22:17
> To: Saisai Shao 
> CC: tony@tendcloud.com ; user 
> 
> Subject: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?
> Yes, I also tried FUSE before, it is not stable and I don’t recommend it
>> On Aug 24, 2016, at 22:15, Saisai Shao > > wrote:
>> 
>> Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS 
>> ), but not so stable as I 
>> tried before.
>> 
>> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui > > wrote:
>> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
>> stability, and also there is additional overhead of network I/O and replica 
>> of HDFS files.
>> 
>>> On Aug 24, 2016, at 21:02, Saisai Shao >> > wrote:
>>> 
>>> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
>>> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
>>> FileSystem API, so writing to Hadoop compatible FS is not worked.
>>> 
>>> Also it is not suitable to write temporary shuffle data into distributed 
>>> FS, this will bring unnecessary overhead. In you case if you have large 
>>> memory on each node, you could use ramfs instead to store shuffle data.
>>> 
>>> Thanks
>>> Saisai
>>> 
>>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>>>  >> > wrote:
>>> Hi, All,
>>> When we run Spark on very large data, spark will do shuffle and the shuffle 
>>> data will write to local disk. Because we have limited capacity at local 
>>> disk, the shuffled data will occupied all of the local disk and then will 
>>> be failed.  So is there a way we can write the shuffle spill data to HDFS? 
>>> Or if we introduce alluxio in our system, can the shuffled data write to 
>>> alluxio?
>>> 
>>> Thanks and Regards,
>>> 
>>> 阎志涛(Tony)
>>> 
>>> 北京腾云天下科技有限公司
>>> 
>>> 邮箱:tony@tendcloud.com 
>>> 电话:13911815695
>>> 微信: zhitao_yan
>>> QQ : 4707059
>>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>>> 邮编:100027
>>> 
>>> TalkingData.com  - 让数据说话



Re: Best way to calculate intermediate column statistics

2016-08-24 Thread Bedrytski Aliaksandr
Hi Richard,

these intermediate statistics should be calculated from the result of
the calculation or during the aggregation?
If they can be derived from the resulting dataframe, why not to cache
(persist) that result just after the calculation?
Then you may aggregate statistics from the cached dataframe.
This way it won't hit performance too much.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Aug 24, 2016, at 16:42, Richard Siebeling wrote:
> Hi,
>
> what is the best way to calculate intermediate column statistics like
> the number of empty values and the number of distinct values each
> column in a dataset when aggregating of filtering data next to the
> actual result of the aggregate or the filtered data?
>
> We are developing an application in which the user can slice-and-dice
> through the data and we would like to, next to the actual resulting
> data, get column statistics of each column in the resulting dataset.
> We prefer to calculate the column statistics on the same pass over the
> data as the actual aggregation or filtering, is that possible?
>
> We could sacrifice a little bit of performance (but not too much),
> that's why we prefer one pass...
>
> Is this possible in the standard Spark or would this mean
> modifying the source a little bit and recompiling? Is that
> feasible / wise to do?
>
> thanks in advance,
> Richard
>
>
>
>


Re: Dataframe write to DB , loosing primary key index & data types.

2016-08-24 Thread sujeet jog
There was a inherent bug in my code which did this,

On Wed, Aug 24, 2016 at 8:07 PM, sujeet jog  wrote:

> Hi,
>
> I have a table with definition as below , when i write any records to this
> table, the varchar(20 ) gets changes to text, and it also losses the
> primary key index,
> any idea how to write data with spark SQL without loosing the primary key
> index & data types. ?
>
>
> MariaDB [analytics]> show columns from fcast;
> +-+-+--+-+--
> -+-+
> | Field   | Type| Null | Key | Default   | Extra
> |
> +-+-+--+-+--
> -+-+
> | TimeSeriesID| varchar(20) | NO   | PRI |   |
> |
> | TimeStamp   | timestamp   | NO   | PRI | CURRENT_TIMESTAMP | on
> update CURRENT_TIMESTAMP |
> | Forecast| double  | YES  | | NULL  |
> |
>
> I'm just doinig DF.write.mode("append").jdbc
>
> Thanks,
>
>


Re: Maelstrom: Kafka integration with Spark

2016-08-24 Thread Cody Koeninger
Yes, spark-streaming-kafka-0-10 uses the new consumer.   Besides
pre-fetching messages, the big reason for that is that security
features are only available with the new consumer.

The Kafka project is at release 0.10.0.1 now, they think most of the
issues with the new consumer have been ironed out.  You can track the
progress as to when they'll remove the "beta" label at
https://issues.apache.org/jira/browse/KAFKA-3283

As far as I know, Kafka in general can't achieve sub-millisecond
end-to-end stream processing, so my guess is you need to be more
specific about your terms there.

I promise I'm not trying to start a pissing contest :)  just wanted to
check if you were aware of the current state of the other consumers.
Collaboration is always welcome.


On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim  wrote:
> Apologies, I was not aware that Spark 2.0 has Kafka Consumer caching/pooling
> now.
> What I have checked is the latest Kafka Consumer, and I believe it is still
> in beta quality.
>
> https://kafka.apache.org/documentation.html#newconsumerconfigs
>
>> Since 0.9.0.0 we have been working on a replacement for our existing
>> simple and high-level consumers.
>> The code is considered beta quality.
>
> Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses this
> one? Is it now stable?
> With this caching feature in Spark 2,.0 could it achieve sub-milliseconds
> stream processing now?
>
>
> Maelstrom still uses the old Kafka Simple Consumer, this library was made
> open source so that I
> could continue working on it for future updates & improvements like when the
> latest Kafka Consumer
> gets a stable release.
>
> We have been using Maelstrom "caching concept" for a long time now, as
> Receiver based Spark Kafka integration
> does not work for us. There were thoughts about using Direct Kafka APIs,
> however Maelstrom has
> very simple APIs and just "simply works" even under unstable scenarios (e.g.
> advertised hostname failures on EMR).
>
> Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and of
> course with the latest Kafka 0.10 as well)
>
>
> On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger  wrote:
>>
>> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
>> kafka consumer instances on the executors?
>>
>> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim  wrote:
>> > Hi,
>> >
>> > I have released the first version of a new Kafka integration with Spark
>> > that we use in the company I work for: open sourced and named Maelstrom.
>> >
>> > It is unique compared to other solutions out there as it reuses the
>> > Kafka Consumer connection to achieve sub-milliseconds latency.
>> >
>> > This library has been running stable in production environment and has
>> > been proven to be resilient to numerous production issues.
>> >
>> >
>> > Please check out the project's page in github:
>> >
>> > https://github.com/jeoffreylim/maelstrom
>> >
>> >
>> > Contributors welcome!
>> >
>> >
>> > Cheers!
>> >
>> > Jeoffrey Lim
>> >
>> >
>> > P.S. I am also looking for a job opportunity, please look me up at
>> > Linked In
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way to calculate intermediate column statistics

2016-08-24 Thread Richard Siebeling
Hi,

what is the best way to calculate intermediate column statistics like the
number of empty values and the number of distinct values each column in a
dataset when aggregating of filtering data next to the actual result of the
aggregate or the filtered data?

We are developing an application in which the user can slice-and-dice
through the data and we would like to, next to the actual resulting data,
get column statistics of each column in the resulting dataset. We prefer to
calculate the column statistics on the same pass over the data as the
actual aggregation or filtering, is that possible?

We could sacrifice a little bit of performance (but not too much), that's
why we prefer one pass...

Is this possible in the standard Spark or would this mean modifying the
source a little bit and recompiling? Is that feasible / wise to do?

thanks in advance,
Richard


Re: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Based on my limited knowledge of Tachyon (Alluxio), it only provides a
layer of Hadoop compatible FileSystem API, which means it cannot be used in
shuffle data store. If it can be mounted as an OS supported FS layer, like
NFS or Fuse, then it can be used for shuffle data store.

But never neglect the overhead and stability of distributed FS (RPC
communication, network latency), since shuffle is quite critical.

On Wed, Aug 24, 2016 at 10:30 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, Saisai and Rui,
> Thanks a lot for your answer.  Alluxio tried to work as the middle layer
> between storage and Spark, so is it possible to use Alluxio to resolve the
> issue? We want to have 1 SSD for every datanode and use Alluxio to manage
> mem,ssd and hdd.
>
> Thanks and Regards,
> Tony
>
> --
> tony@tendcloud.com
>
>
> *From:* Sun Rui 
> *Date:* 2016-08-24 22:17
> *To:* Saisai Shao 
> *CC:* tony@tendcloud.com; user 
> *Subject:* Re: Can we redirect Spark shuffle spill data to HDFS or
> Alluxio?
> Yes, I also tried FUSE before, it is not stable and I don’t recommend it
>
> On Aug 24, 2016, at 22:15, Saisai Shao  wrote:
>
> Also fuse is another candidate (https://wiki.apache.org/
> hadoop/MountableHDFS), but not so stable as I tried before.
>
> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui  wrote:
>
>> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the
>> stability, and also there is additional overhead of network I/O and replica
>> of HDFS files.
>>
>> On Aug 24, 2016, at 21:02, Saisai Shao  wrote:
>>
>> Spark Shuffle uses Java File related API to create local dirs and R/W
>> data, so it can only be worked with OS supported FS. It doesn't leverage
>> Hadoop FileSystem API, so writing to Hadoop compatible FS is not worked.
>>
>> Also it is not suitable to write temporary shuffle data into distributed
>> FS, this will bring unnecessary overhead. In you case if you have large
>> memory on each node, you could use ramfs instead to store shuffle data.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
>> tony@tendcloud.com> wrote:
>>
>>> Hi, All,
>>> When we run Spark on very large data, spark will do shuffle and the
>>> shuffle data will write to local disk. Because we have limited capacity at
>>> local disk, the shuffled data will occupied all of the local disk and then
>>> will be failed.  So is there a way we can write the shuffle spill data to
>>> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
>>> to alluxio?
>>>
>>> Thanks and Regards,
>>>
>>> --
>>> 阎志涛(Tony)
>>>
>>> 北京腾云天下科技有限公司
>>> 
>>> 
>>> 邮箱:tony@tendcloud.com
>>> 电话:13911815695
>>> 微信: zhitao_yan
>>> QQ : 4707059
>>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>>> 邮编:100027
>>> 
>>> 
>>> TalkingData.com  - 让数据说话
>>>
>>
>>
>>
>
>


Dataframe write to DB , loosing primary key index & data types.

2016-08-24 Thread sujeet jog
Hi,

I have a table with definition as below , when i write any records to this
table, the varchar(20 ) gets changes to text, and it also losses the
primary key index,
any idea how to write data with spark SQL without loosing the primary key
index & data types. ?


MariaDB [analytics]> show columns from fcast;
+-+-+--+-+---+-+
| Field   | Type| Null | Key | Default   | Extra
|
+-+-+--+-+---+-+
| TimeSeriesID| varchar(20) | NO   | PRI |   |
|
| TimeStamp   | timestamp   | NO   | PRI | CURRENT_TIMESTAMP | on
update CURRENT_TIMESTAMP |
| Forecast| double  | YES  | | NULL  |
|

I'm just doinig DF.write.mode("append").jdbc

Thanks,


Re: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread tony....@tendcloud.com
Hi, Saisai and Rui,
Thanks a lot for your answer.  Alluxio tried to work as the middle layer 
between storage and Spark, so is it possible to use Alluxio to resolve the 
issue? We want to have 1 SSD for every datanode and use Alluxio to manage 
mem,ssd and hdd. 

Thanks and Regards,
Tony



tony@tendcloud.com
 
From: Sun Rui
Date: 2016-08-24 22:17
To: Saisai Shao
CC: tony@tendcloud.com; user
Subject: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?
Yes, I also tried FUSE before, it is not stable and I don’t recommend it
On Aug 24, 2016, at 22:15, Saisai Shao  wrote:

Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS), 
but not so stable as I tried before.

On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui  wrote:
For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
stability, and also there is additional overhead of network I/O and replica of 
HDFS files.

On Aug 24, 2016, at 21:02, Saisai Shao  wrote:

Spark Shuffle uses Java File related API to create local dirs and R/W data, so 
it can only be worked with OS supported FS. It doesn't leverage Hadoop 
FileSystem API, so writing to Hadoop compatible FS is not worked.

Also it is not suitable to write temporary shuffle data into distributed FS, 
this will bring unnecessary overhead. In you case if you have large memory on 
each node, you could use ramfs instead to store shuffle data.

Thanks
Saisai

On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
 wrote:
Hi, All,
When we run Spark on very large data, spark will do shuffle and the shuffle 
data will write to local disk. Because we have limited capacity at local disk, 
the shuffled data will occupied all of the local disk and then will be failed.  
So is there a way we can write the shuffle spill data to HDFS? Or if we 
introduce alluxio in our system, can the shuffled data write to alluxio?

Thanks and Regards,



阎志涛(Tony)

北京腾云天下科技有限公司

邮箱:tony@tendcloud.com
电话:13911815695
微信: zhitao_yan
QQ : 4707059
地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
邮编:100027

TalkingData.com - 让数据说话






Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
Yes, I also tried FUSE before, it is not stable and I don’t recommend it
> On Aug 24, 2016, at 22:15, Saisai Shao  wrote:
> 
> Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS 
> ), but not so stable as I tried 
> before.
> 
> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui  > wrote:
> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
> stability, and also there is additional overhead of network I/O and replica 
> of HDFS files.
> 
>> On Aug 24, 2016, at 21:02, Saisai Shao > > wrote:
>> 
>> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
>> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
>> FileSystem API, so writing to Hadoop compatible FS is not worked.
>> 
>> Also it is not suitable to write temporary shuffle data into distributed FS, 
>> this will bring unnecessary overhead. In you case if you have large memory 
>> on each node, you could use ramfs instead to store shuffle data.
>> 
>> Thanks
>> Saisai
>> 
>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>>  > > wrote:
>> Hi, All,
>> When we run Spark on very large data, spark will do shuffle and the shuffle 
>> data will write to local disk. Because we have limited capacity at local 
>> disk, the shuffled data will occupied all of the local disk and then will be 
>> failed.  So is there a way we can write the shuffle spill data to HDFS? Or 
>> if we introduce alluxio in our system, can the shuffled data write to 
>> alluxio?
>> 
>> Thanks and Regards,
>> 
>> 阎志涛(Tony)
>> 
>> 北京腾云天下科技有限公司
>> 
>> 邮箱:tony@tendcloud.com 
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>> 
>> TalkingData.com  - 让数据说话
>> 
> 
> 



Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS),
but not so stable as I tried before.

On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui  wrote:

> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the
> stability, and also there is additional overhead of network I/O and replica
> of HDFS files.
>
> On Aug 24, 2016, at 21:02, Saisai Shao  wrote:
>
> Spark Shuffle uses Java File related API to create local dirs and R/W
> data, so it can only be worked with OS supported FS. It doesn't leverage
> Hadoop FileSystem API, so writing to Hadoop compatible FS is not worked.
>
> Also it is not suitable to write temporary shuffle data into distributed
> FS, this will bring unnecessary overhead. In you case if you have large
> memory on each node, you could use ramfs instead to store shuffle data.
>
> Thanks
> Saisai
>
> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
> tony@tendcloud.com> wrote:
>
>> Hi, All,
>> When we run Spark on very large data, spark will do shuffle and the
>> shuffle data will write to local disk. Because we have limited capacity at
>> local disk, the shuffled data will occupied all of the local disk and then
>> will be failed.  So is there a way we can write the shuffle spill data to
>> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
>> to alluxio?
>>
>> Thanks and Regards,
>>
>> --
>> 阎志涛(Tony)
>>
>> 北京腾云天下科技有限公司
>> -
>> ---
>> 邮箱:tony@tendcloud.com
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>> 
>> 
>> TalkingData.com  - 让数据说话
>>
>
>
>


Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
stability, and also there is additional overhead of network I/O and replica of 
HDFS files.
> On Aug 24, 2016, at 21:02, Saisai Shao  wrote:
> 
> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
> FileSystem API, so writing to Hadoop compatible FS is not worked.
> 
> Also it is not suitable to write temporary shuffle data into distributed FS, 
> this will bring unnecessary overhead. In you case if you have large memory on 
> each node, you could use ramfs instead to store shuffle data.
> 
> Thanks
> Saisai
> 
> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>   > wrote:
> Hi, All,
> When we run Spark on very large data, spark will do shuffle and the shuffle 
> data will write to local disk. Because we have limited capacity at local 
> disk, the shuffled data will occupied all of the local disk and then will be 
> failed.  So is there a way we can write the shuffle spill data to HDFS? Or if 
> we introduce alluxio in our system, can the shuffled data write to alluxio?
> 
> Thanks and Regards,
> 
> 阎志涛(Tony)
> 
> 北京腾云天下科技有限公司
> 
> 邮箱:tony@tendcloud.com 
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
> 
> TalkingData.com  - 让数据说话
> 



Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Spark Shuffle uses Java File related API to create local dirs and R/W data,
so it can only be worked with OS supported FS. It doesn't leverage Hadoop
FileSystem API, so writing to Hadoop compatible FS is not worked.

Also it is not suitable to write temporary shuffle data into distributed
FS, this will bring unnecessary overhead. In you case if you have large
memory on each node, you could use ramfs instead to store shuffle data.

Thanks
Saisai

On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, All,
> When we run Spark on very large data, spark will do shuffle and the
> shuffle data will write to local disk. Because we have limited capacity at
> local disk, the shuffled data will occupied all of the local disk and then
> will be failed.  So is there a way we can write the shuffle spill data to
> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
> to alluxio?
>
> Thanks and Regards,
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
> 
> 
> TalkingData.com  - 让数据说话
>


Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread tony....@tendcloud.com
Hi, All,
When we run Spark on very large data, spark will do shuffle and the shuffle 
data will write to local disk. Because we have limited capacity at local disk, 
the shuffled data will occupied all of the local disk and then will be failed.  
So is there a way we can write the shuffle spill data to HDFS? Or if we 
introduce alluxio in our system, can the shuffled data write to alluxio?

Thanks and Regards,



阎志涛(Tony)

北京腾云天下科技有限公司

邮箱:tony@tendcloud.com
电话:13911815695
微信: zhitao_yan
QQ : 4707059
地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
邮编:100027

TalkingData.com - 让数据说话


Re: Spark Streaming application failing with Token issue

2016-08-24 Thread Jacek Laskowski
Hi Steve,

Thanks a lot for such an elaborative email (though it brought more
questions than answers but it's because I'm new to YARN in general and
Kerberos/tokens/tickets in particular).

Thanks also for liking my notes. I'm very honoured to hear it from
you. I value your work with Spark/YARN/Hadoop. I'm going to spend some
time on security stuff and Kerberos is on my list (to learn why YARN
could be a better option than Mesos). I'll ping you when I'm ready for
review. Thanks.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 24, 2016 at 11:28 AM, Steve Loughran  wrote:
>
>> On 23 Aug 2016, at 11:26, Jacek Laskowski  wrote:
>>
>> Hi Steve,
>>
>> Could you share your opinion on whether the token gets renewed or not?
>> Is the token going to expire after 7 days anyway?
>
>
> There's Hadoop service tokens, and Kerberos tickets. They are similar-ish, 
> but not quite the same.
>
> -Kerberos "tickets" expire, you need to re-authenticate with a keytab or 
> user+password
> -Hadoop "Tokens" are more anonymous. A kerberos authenticated application has 
> to talk to the service to ask for a token (i.e. it uses a kerberos ticket to 
> say "I need a token for operation X for Y hours".
> -There are protocols for renewing tokens up to a time limit; can be done over 
> IPC mechanisms, or REST APIs using SASL
>
> I get a bit mixed up myself, and use "tickets and tokens" to allow myself to 
> get away with mistakes
>
> Things about kerberos you didn't want to know but will end up discovering in 
> stack traces anyway
>
> webinar: 
> http://hortonworks.com/webinar/hadoop-and-kerberos-the-madness-beyond-the-gate/
>
> and
>
> https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/
>
> YARN apps can run for a couple of days renewing tokens, but eventually the 
> time limit on token renewal is reached —they need to use a kerberos ticket to 
> request new tokens.
> If something times out after 7 days, I would guess that it's Kerberos ticket 
> expiry; a keytab needs to be passed to Spark for it to do the renewal
>
> The current YARN docs on this: 
> https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md
>
>
>
>
>
>>
>> Why is the change in
>> the recent version for token renewal? See
>> https://github.com/apache/spark/commit/ab648c0004cfb20d53554ab333dd2d198cb94ffa
>>
>
>
> That's designed to make it easy for a kerberos-authenticated client to get 
> tokens for more services. Before: hard coded support for HDFS, HBase, Hive. 
> After: anything which implements the same interface. This includes multiple 
> HBase servers, more than one Hive metastore, etc. It also stops the spark 
> client code needing lots of one-off classes, allows people to add their own 
> token fetching code for their own services.
>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>
>
> Like your e-book BTW
>
> If you plan to add a specific section of Spark & Kerberos, I'd gladly help 
> review it.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: work with russian letters

2016-08-24 Thread Jacek Laskowski
Hi Alex,

Mind showing the schema? Could you AS the columns right after you load
the dataset? What are the current problems you're dealing with?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 24, 2016 at 11:37 AM, AlexModestov
 wrote:
>   Hello everybody,
>
>   I want to work with DataFrames where some columns have a string type. And
> there are russian letters.
>   Russian letters are incorrect in the text. Could you help me how I should
> work with them?
>
>   Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/work-with-russian-letters-tp27594.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark MLlib:Collaborative Filtering

2016-08-24 Thread Praveen Devarao
You could use the string indexer to convert your string userids and 
product ids numeric value. 
http://spark.apache.org/docs/latest/ml-features.html#stringindexer

Thanking You
-
Praveen Devarao
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   glen 
To: "Devi P.V" 
Cc: "user@spark.apache.org" 
Date:   24/08/2016 02:10 pm
Subject:Re: Spark MLlib:Collaborative Filtering



Hash it to int



On 2016-08-24 16:28 , Devi P.V Wrote: 

Hi all,
I am newbie in collaborative filtering.I want to implement collaborative 
filtering algorithm(need to find top 10 recommended products) using Spark 
and Scala.I have a rating dataset where userID & ProductID are String 
type.

UserID   ProductID Rating
b3a68043-c1  p1-160ff5fDS-f74   1
b3a68043-c2  p5-160ff5fDS-f74   1
b3a68043-c0  p9-160ff5fDS-f74   1


I tried ALS algorithm using spark MLlib.But it support rating userID & 
productID only Integer type.How can I solve this problem?

Thanks In Advance







Re: Is "spark streaming" streaming or mini-batch?

2016-08-24 Thread Steve Loughran

On 23 Aug 2016, at 17:58, Mich Talebzadeh 
> wrote:

In general depending what you are doing you can tighten above parameters. For 
example if you are using Spark Streaming for Anti-fraud detection, you may 
stream data in at 2 seconds batch interval, Keep your windows length at 4 
seconds and your sliding intervall = 2 seconds which gives you a kind of tight 
streaming. You are aggregating data that you are collecting over the  batch 
Window.

I should warn that in https://github.com/apache/spark/pull/14731 I've been 
trying to speed up input scanning against object stores, and collecting numbers 
on the way

*if you are using the FileInputDStream to scan s3, azure (and persumably gcs) 
object stores for data, the time to scan a moderately complex directory tree is 
going to be measurable in seconds*

It's going to depend on distance from the object store and number of files, but 
you'll probably need to use a bigger window

(that patch for SPARK-17159 should improve things ... I'd love some people to 
help by testing it or emailing me direct with any (anonymised) list of what 
their directory structures used in object store FileInputDStream streams that I 
could regenerate for inclusion in some performance tests.




work with russian letters

2016-08-24 Thread AlexModestov
  Hello everybody,

  I want to work with DataFrames where some columns have a string type. And
there are russian letters.
  Russian letters are incorrect in the text. Could you help me how I should
work with them?
  
  Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/work-with-russian-letters-tp27594.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming application failing with Token issue

2016-08-24 Thread Steve Loughran

> On 23 Aug 2016, at 11:26, Jacek Laskowski  wrote:
> 
> Hi Steve,
> 
> Could you share your opinion on whether the token gets renewed or not?
> Is the token going to expire after 7 days anyway?


There's Hadoop service tokens, and Kerberos tickets. They are similar-ish, but 
not quite the same. 

-Kerberos "tickets" expire, you need to re-authenticate with a keytab or 
user+password
-Hadoop "Tokens" are more anonymous. A kerberos authenticated application has 
to talk to the service to ask for a token (i.e. it uses a kerberos ticket to 
say "I need a token for operation X for Y hours". 
-There are protocols for renewing tokens up to a time limit; can be done over 
IPC mechanisms, or REST APIs using SASL

I get a bit mixed up myself, and use "tickets and tokens" to allow myself to 
get away with mistakes

Things about kerberos you didn't want to know but will end up discovering in 
stack traces anyway

webinar: 
http://hortonworks.com/webinar/hadoop-and-kerberos-the-madness-beyond-the-gate/

and 

https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/

YARN apps can run for a couple of days renewing tokens, but eventually the time 
limit on token renewal is reached —they need to use a kerberos ticket to 
request new tokens. 
If something times out after 7 days, I would guess that it's Kerberos ticket 
expiry; a keytab needs to be passed to Spark for it to do the renewal

The current YARN docs on this: 
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md





> 
> Why is the change in
> the recent version for token renewal? See
> https://github.com/apache/spark/commit/ab648c0004cfb20d53554ab333dd2d198cb94ffa
> 


That's designed to make it easy for a kerberos-authenticated client to get 
tokens for more services. Before: hard coded support for HDFS, HBase, Hive. 
After: anything which implements the same interface. This includes multiple 
HBase servers, more than one Hive metastore, etc. It also stops the spark 
client code needing lots of one-off classes, allows people to add their own 
token fetching code for their own services.

> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski


Like your e-book BTW

If you plan to add a specific section of Spark & Kerberos, I'd gladly help 
review it.

Best range of parameters for grid search?

2016-08-24 Thread Adamantios Corais
I would like to run a naive implementation of grid search with MLlib but 
I am a bit confused about choosing the 'best' range of parameters. 
Apparently, I do not want to waste too much resources for a combination 
of parameters that will probably not give a better mode. Any suggestions 
from your experience?


val intercept   : List[Boolean]  = List(false)
val classes : List[Int]  = List(2)
val validate: List[Boolean]  = List(true)
val tolerance   : List[Double]   = List(0.001 , 0.01 , 0.1 , 
0.0001 , 0.001 , 0.01 , 0.1 , 1.0)
val gradient: List[Gradient] = List(new LogisticGradient() , new 
LeastSquaresGradient() , new HingeGradient())

val corrections : List[Int]  = List(5 , 10 , 15)
val iters   : List[Int]  = List(1 , 10 , 100 , 1000 , 1)
val regparam: List[Double]   = List(0.0 , 0.0001 , 0.001 , 0.01 , 
0.1 , 1.0 , 10.0 , 100.0)
val updater : List[Updater]  = List(new SimpleUpdater() , new 
L1Updater() , new SquaredL2Updater())


val combinations = for (a <- intercept;
b <- classes;
c <- validate;
d <- tolerance;
e <- gradient;
f <- corrections;
g <- iters;
h <- regparam;
i <- updater) yield (a,b,c,d,e,f,g,h,i)

for( ( interceptS , classesS , validateS , toleranceS , gradientS , 
correctionsS , itersS , regParamS , updaterS ) <- combinations.take(3) ) {


  val lr : LogisticRegressionWithLBFGS = new 
LogisticRegressionWithLBFGS().

setIntercept(addIntercept=interceptS).
setNumClasses(numClasses=classesS).
setValidateData(validateData=validateS)

  lr.
optimizer.
setConvergenceTol(tolerance=toleranceS).
setGradient(gradient=gradientS).
setNumCorrections(corrections=correctionsS).
setNumIterations(iters=itersS).
setRegParam(regParam=regParamS).
setUpdater(updater=updaterS)

}


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: dynamic allocation in Spark 2.0

2016-08-24 Thread Saisai Shao
This looks like Spark application is running into a abnormal status. From
the stack it means driver could not send requests to AM, can you please
check if AM is reachable or are there any other exceptions beside this one.

>From my past test, Spark's dynamic allocation may run into some corner
cases when NM is gone or restarted, it would be better to check all the
logs (driver, AM and executors) to find out some clues why run into this.
It is hard to tell a exact reason simply based on the exception you pasted
above.

On Wed, Aug 24, 2016 at 3:16 PM, Shane Lee 
wrote:

> Hello all,
>
> I am running hadoop 2.6.4 with Spark 2.0 and I have been trying to get
> dynamic allocation to work without success. I was able to get it to work
> with Spark 16.1 however.
>
> When I issue the command
> spark-shell --master yarn --deploy-mode client
>
> this is the error I see:
>
> 16/08/24 00:05:40 WARN NettyRpcEndpointRef: Error sending message [message
> = RequestExecutors(1,0,Map())] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org$apache$spark$
> rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:63)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:78)
> at
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.
> doRequestTotalExecutors(YarnSchedulerBackend.scala:128)
> at org.apache.spark.scheduler.cluster.
> CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGr
> ainedSchedulerBackend.scala:493)
> at org.apache.spark.SparkContext.requestTotalExecutors(
> SparkContext.scala:1482)
> at org.apache.spark.ExecutorAllocationManager.start(
> ExecutorAllocationManager.scala:235)
> at org.apache.spark.SparkContext$$anonfun$21.apply(
> SparkContext.scala:534)
> at org.apache.spark.SparkContext$$anonfun$21.apply(
> SparkContext.scala:534)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.SparkContext.(SparkContext.scala:534)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.
> scala:2256)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 8.apply(SparkSession.scala:831)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 8.apply(SparkSession.scala:823)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.SparkSession$Builder.
> getOrCreate(SparkSession.scala:823)
> at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
> at $line3.$read$$iw$$iw.(:15)
> at $line3.$read$$iw.(:31)
> at $line3.$read.(:33)
> at $line3.$read$.(:37)
> at $line3.$read$.()
> at $line3.$eval$.$print$lzycompute(:7)
> at $line3.$eval$.$print(:6)
> at $line3.$eval.$print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(
> IMain.scala:786)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(
> IMain.scala:1047)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.sca
> la:638)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.sca
> la:637)
> at scala.reflect.internal.util.ScalaClassLoader$class.
> asContext(ScalaClassLoader.scala:31)
> at scala.reflect.internal.util.AbstractFileClassLoader.asContext(
> AbstractFileClassLoader.sca
> la:19)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest.
> loadAndRunReq(IMain.scala:637)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(
> ILoop.scala:807)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
> at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
> at org.apache.spark.repl.SparkILoop$$anonfun$
> initializeSpark$1.apply$mcV$sp(SparkILoop.scala
> :38)
> at 

Re: Spark MLlib:Collaborative Filtering

2016-08-24 Thread glen
Hash it to int




On 2016-08-24 16:28 , Devi P.V Wrote:


Hi all,
I am newbie in collaborative filtering.I want to implement collaborative 
filtering algorithm(need to find top 10 recommended products) using Spark and 
Scala.I have a rating dataset where userID & ProductID are String type.

UserID   ProductID Rating
b3a68043-c1  p1-160ff5fDS-f74   1
b3a68043-c2  p5-160ff5fDS-f74   1
b3a68043-c0  p9-160ff5fDS-f74   1


I tried ALS algorithm using spark MLlib.But it support rating userID & 
productID only Integer type.How can I solve this problem?


Thanks In Advance


Re: DataFrame Data Manipulation - Based on a timestamp column Not Working

2016-08-24 Thread Bedrytski Aliaksandr
Hi Subhajit,

you may try to use sql queries instead of helper methods:

> sales_order_base_dataFrame.registerTempTable("sales_orders")
>
> val result = sqlContext.sql("""
> SELECT *
> FROM sales_orders
> WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'__-MM-_dd_') >=
> unix_timestamp(demand_timefence_end_date ,'__-MM-_dd_')
> """)

This is if demand_timefence_end_date  has '__-MM-_dd_' date format

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Aug 24, 2016, at 00:46, Subhajit Purkayastha wrote:
> Using spark 2.0  & scala 2.11.8, I have a DataFrame with a
> timestamp column
>
> root
> |-- ORG_ID: integer (nullable = true)
> |-- HEADER_ID: integer (nullable = true)
> |-- ORDER_NUMBER: integer (nullable = true)
> |-- LINE_ID: integer (nullable = true)
> |-- LINE_NUMBER: integer (nullable = true)
> |-- ITEM_TYPE_CODE: string (nullable = true)
> |-- ORGANIZATION_ID: integer (nullable = true)
> |-- INVENTORY_ITEM_ID: integer (nullable = true)
> |-- SCHEDULE_SHIP_DATE: timestamp (nullable = true)
> |-- ORDER_QUANTITY_UOM: string (nullable = true)
> |-- UNIT_SELLING_PRICE: double (nullable = true)
> |-- OPEN_QUANTITY: double (nullable = true)
>
> [204,94468,56721,197328,1,STANDARD,207,149,2004-01-08
> 23:59:59.0,Ea,1599.0,28.0]
> [204,94468,56721,197331,2,STANDARD,207,151,2004-01-08
> 23:59:59.0,Ea,1899.05,40.0]
> [204,94468,56721,197332,3,STANDARD,207,436,2004-01-08
> 23:59:59.0,Ea,300.0,24.0]
> [204,94468,56721,197335,4,STANDARD,207,3751,2004-01-08
> 23:59:59.0,Ea,380.0,24.0]
>
> I want to manipulate the dataframe data based on a parameter =
> demand_time_fence_date
>
> *var* demand_timefence_end_date_instance = *new*
> MutableDateTime(planning_start_date)
> *var* demand_timefence_days =
> demand_timefence_end_date_instance.addDays(demand_time_fence)
> *val* demand_timefence_end_date = ISODateTimeFormat.yearMonthDay().pr-
> int(demand_timefence_end_date_instance)
>
> _var_ filter_stmt = "from_unixtime(SCHEDULE_SHIP_DATE,'__-MM-
> _dd_') >= "+ demand_timefence_end_date
>
> *val* sales_order_dataFrame =
> sales_order_base_dataFrame.filter(filter_stmt).limit(10)
>
> What is the correct syntax to pass the parameter value?
>
> The above filter statement is not working to restrict the dataset
>
> Thanks,
>
> Subhajit
>
>


Spark MLlib:Collaborative Filtering

2016-08-24 Thread Devi P.V
Hi all,
I am newbie in collaborative filtering.I want to implement collaborative
filtering algorithm(need to find top 10 recommended products) using Spark
and Scala.I have a rating dataset where userID & ProductID are String type.

UserID   ProductID Rating
b3a68043-c1  p1-160ff5fDS-f74   1
b3a68043-c2  p5-160ff5fDS-f74   1
b3a68043-c0  p9-160ff5fDS-f74   1


I tried ALS algorithm using spark MLlib.But it support rating userID &
productID only Integer type.How can I solve this problem?

Thanks In Advance


Future of GraphX

2016-08-24 Thread mas
Hi, 

I am wondering if there is any current work going on optimizations of
GraphX? 
I am aware of GraphFrames that is built on Data frame. However, is there any
plane to build GraphX's version on newer Spark APIs, i.e., Datasets or Spark
2.0?

Furthermore, Is there any plan to incorporate Graph Streaming.

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Future-of-GraphX-tp27592.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



dynamic allocation in Spark 2.0

2016-08-24 Thread Shane Lee
Hello all,
I am running hadoop 2.6.4 with Spark 2.0 and I have been trying to get dynamic 
allocation to work without success. I was able to get it to work with Spark 
16.1 however.
When I issue the commandspark-shell --master yarn --deploy-mode client
this is the error I see:
16/08/24 00:05:40 WARN NettyRpcEndpointRef: Error sending message [message = 
RequestExecutors(1,0,Map())] in 1 
attemptsorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
seconds]. This timeout is controlled by spark.rpc.askTimeout        at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)        
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)      
  at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)  
      
atorg.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:128)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:493)
        at 
org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1482)    
    at 
org.apache.spark.ExecutorAllocationManager.start(ExecutorAllocationManager.scala:235)
        at 
org.apache.spark.SparkContext$$anonfun$21.apply(SparkContext.scala:534)        
at org.apache.spark.SparkContext$$anonfun$21.apply(SparkContext.scala:534)      
  at scala.Option.foreach(Option.scala:257)        at 
org.apache.spark.SparkContext.(SparkContext.scala:534)        at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2256)        at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:831)
        at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:823)
        at scala.Option.getOrElse(Option.scala:121)        at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)   
     at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)        at 
$line3.$read$$iw$$iw.(:15)        at 
$line3.$read$$iw.(:31)        at 
$line3.$read.(:33)        at $line3.$read$.(:37)  
      at $line3.$read$.()        at 
$line3.$eval$.$print$lzycompute(:7)        at 
$line3.$eval$.$print(:6)        at $line3.$eval.$print()      
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)        at 
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)        
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at 
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at 
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at 
scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) 
       at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)        
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)        at 
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)        
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)        at 
scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)        at 
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)     
   at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)     
   at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:94)        at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
        at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)     
   at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)     
   at 
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at