Re: [Spark R]: dapply only works for very small datasets

2017-11-28 Thread Felix Cheung
You can find more discussions in
https://issues.apache.org/jira/browse/SPARK-18924
And
https://issues.apache.org/jira/browse/SPARK-17634

I suspect the cost is linear - so partitioning the data into smaller chunks 
with more executors (one core each) running in parallel would probably help a 
bit.

Unfortunately this is an area that we really would use some improvements on, 
and I think it *should* be possible (hmm  
https://databricks.com/blog/2017/10/06/accelerating-r-workflows-on-databricks.html.
 ;)

_
From: Kunft, Andreas 
Sent: Tuesday, November 28, 2017 3:11 AM
Subject: AW: [Spark R]: dapply only works for very small datasets
To: Felix Cheung , 



Thanks for the fast reply.


I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well 
as on 4 nodes with the same specifications.

When I shrink the data to around 100MB,

it runs in about 1 hour for 1 core and about 6 min with 8 cores.


I'm aware that the serDe takes time, but it seems there must be something else 
off considering these numbers.



Von: Felix Cheung 
Gesendet: Montag, 27. November 2017 20:20
An: Kunft, Andreas; user@spark.apache.org
Betreff: Re: [Spark R]: dapply only works for very small datasets

What’s the number of executor and/or number of partitions you are working with?

I’m afraid most of the problem is with the serialization deserialization 
overhead between JVM and R...


From: Kunft, Andreas 
Sent: Monday, November 27, 2017 10:27:33 AM
To: user@spark.apache.org
Subject: [Spark R]: dapply only works for very small datasets


Hello,


I tried to execute some user defined functions with R using the airline arrival 
performance dataset.

While the examples from the documentation for the `<-` apply operator work 
perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now 
the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several 
DOPs.

Best
Andreas





Re: NLTK with Spark Streaming

2017-11-28 Thread Nicholas Hakobian
Depending on your needs, its fairly easy to write a lightweight python
wrapper around the Databricks spark-corenlp library:
https://github.com/databricks/spark-corenlp


Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sun, Nov 26, 2017 at 8:19 AM, ashish rawat  wrote:

> Thanks Holden and Chetan.
>
> Holden - Have you tried it out, do you know the right way to do it?
> Chetan - yes, if we use a Java NLP library, it should not be any issue in
> integrating with spark streaming, but as I pointed out earlier, we want to
> give flexibility to data scientists to use the language and library of
> their choice, instead of restricting them to a library of our choice.
>
> On Sun, Nov 26, 2017 at 9:42 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> But you can still use Stanford NLP library and distribute through spark
>> right !
>>
>> On Sun, Nov 26, 2017 at 3:31 PM, Holden Karau 
>> wrote:
>>
>>> So it’s certainly doable (it’s not super easy mind you), but until the
>>> arrow udf release goes out it will be rather slow.
>>>
>>> On Sun, Nov 26, 2017 at 8:01 AM ashish rawat 
>>> wrote:
>>>
 Hi,

 Has someone tried running NLTK (python) with Spark Streaming (scala)? I
 was wondering if this is a good idea and what are the right Spark operators
 to do this? The reason we want to try this combination is that we don't
 want to run our transformations in python (pyspark), but after the
 transformations, we need to run some natural language processing operations
 and we don't want to restrict the functions data scientists' can use to
 Spark natural language library. So, Spark streaming with NLTK looks like
 the right option, from the perspective of fast data processing and data
 science flexibility.

 Regards,
 Ashish

>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>


Re: Writing custom Structured Streaming receiver

2017-11-28 Thread Hien Luu
Cool.  Thanks nezhazheng.  I will give it a shot.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming: emitted record count

2017-11-28 Thread aravias
 In structured streaming, the QueryProgressEvent does not seem to have
the final emitted record count to the destination, I see only the number of
input rows. I was trying to use the count (additional action after
persisting the dataset), but I face the below exception when calling persist
or count on the dataset before the query is started. I have a sample code
below, please suggest how to get the query running and the final count.

"Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming
sources must be executed with writeStream.start();;" 


   Dataset data = transform(kafkaTopic, SPECIFIC_AVRO,
kafkaStreamSet, UserSessionEventJoin.class,
PropertyPageView.class, (Function & Serializable)
  (UserSessionEventJoin userSessionEventJoin) ->  {
UserEvent userEvent =
userSessionEventJoin.getUserEvent();
if (userEvent != null &&
TYPE_PAGE_VIEW.equalsIgnoreCase(userEvent.getType())) {
if (userEvent.getPayloadMap() != null) {
return PAGE_TYPE_PROPERTY.equalsIgnoreCase(
userEvent.getPayloadMap().get(PAGE_TYPE));
}
}
return false;
}

);

data.persist(StorageLevel.MEMORY_AND_DISK());
log.info("dataset persisted");
 
  
long emittedCount = data.count();

Map metricTags = new HashMap<>();
metricTags.put("source",kafkaTopic);
metricTags.put("destination",sinkPath);
DataMonitorMetric recordsWrittenMetric = dataMonitorUtils
.buildDataMonitorMetricWithValue(null,
System.currentTimeMillis(),
"numOutputRows", metricTags, Aspect.EMITTED, emittedCount);
dataMonitorUtils.sendMetric(recordsWrittenMetric);

StreamingQuery streamingQuery =
data.writeStream().outputMode("append")
.format("parquet")
.option("checkpointLocation",
"file:///Users/asethurathnam/Downloads/parquet/checkpoint")
.trigger(Trigger.ProcessingTime(1000, TimeUnit.MILLISECONDS))
.partitionBy("eventDate")
   
.start("file:///Users/asethurathnam/Downloads/parquet/output-parquet");


data.unpersist();
log.info("dataset unpersisted");

streamingQuery.awaitTermination();






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: does "Deep Learning Pipelines" scale out linearly?

2017-11-28 Thread Tim Hunter
Hello Andy,
regarding your question, this will depend a lot on the specific task:
 - for tasks that are "easy" to distribute such as inference
(scoring), hyper-parameter tuning or cross-validation, these tasks
will take full advantage of the cluster and the performance should
improve more or less linearly
 - for training the same model with multiple machines, and a
distributed dataset, then you are currently better off with a
dedicated solution such as TensorFlowOnSpark or dist-keras. We are
working on addressing this issue in a future release.

Also, we opened a mailing list dedicated to Deep Learning Pipelines,
to which I will copy this answer. Feel free to answer there:

https://groups.google.com/forum/#!forum/dl-pipelines-users/


Tim


On November 22, 2017 at 10:02:59 AM, Andy Davidson
(a...@santacruzintegration.com) wrote:
> I am starting a new deep learning project currently we do all of our work on
> a single machine using a combination of Keras and Tensor flow.
> https://databricks.github.io/spark-deep-learning/site/index.html looks very
> promising. Any idea how performance is likely to improve as I add machines
> to my my cluster?
>
> Kind regards
>
> Andy
>
>
> P.s. Is user@spark.apache.org the best place to ask questions about this
> package?
>
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Data Frame. PreSorded partitions

2017-11-28 Thread Michael Artz
I'm not sure other than retrieving from a hive table that is already
sorted.  This sounds cool though, would be interested to know this as well

On Nov 28, 2017 10:40 AM, "Николай Ижиков"  wrote:

> Hello, guys!
>
> I work on implementation of custom DataSource for Spark Data Frame API and
> have a question:
>
> If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort
> data inside a partition in my data source.
>
> Do I have a built-in option to tell spark that data from each partition
> already sorted?
>
> It seems that Spark can benefit from usage of already sorted partitions.
> By using of distributed merge sort algorithm, for example.
>
> Does it make sense for you?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Data Frame. PreSorded partitions

2017-11-28 Thread Николай Ижиков

Hello, guys!

I work on implementation of custom DataSource for Spark Data Frame API and have 
a question:

If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort data 
inside a partition in my data source.

Do I have a built-in option to tell spark that data from each partition already 
sorted?

It seems that Spark can benefit from usage of already sorted partitions.
By using of distributed merge sort algorithm, for example.

Does it make sense for you?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



AW: [Spark R]: dapply only works for very small datasets

2017-11-28 Thread Kunft, Andreas
Thanks for the fast reply.


I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well 
as on 4 nodes with the same specifications.

When I shrink the data to around 100MB,

it runs in about 1 hour for 1 core and about 6 min with 8 cores.


I'm aware that the serDe takes time, but it seems there must be something else 
off considering these numbers.



Von: Felix Cheung 
Gesendet: Montag, 27. November 2017 20:20
An: Kunft, Andreas; user@spark.apache.org
Betreff: Re: [Spark R]: dapply only works for very small datasets

What's the number of executor and/or number of partitions you are working with?

I'm afraid most of the problem is with the serialization deserialization 
overhead between JVM and R...


From: Kunft, Andreas 
Sent: Monday, November 27, 2017 10:27:33 AM
To: user@spark.apache.org
Subject: [Spark R]: dapply only works for very small datasets


Hello,


I tried to execute some user defined functions with R using the airline arrival 
performance dataset.

While the examples from the documentation for the `<-` apply operator work 
perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now 
the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several 
DOPs.

Best
Andreas