Re: [Spark R]: dapply only works for very small datasets
You can find more discussions in https://issues.apache.org/jira/browse/SPARK-18924 And https://issues.apache.org/jira/browse/SPARK-17634 I suspect the cost is linear - so partitioning the data into smaller chunks with more executors (one core each) running in parallel would probably help a bit. Unfortunately this is an area that we really would use some improvements on, and I think it *should* be possible (hmm https://databricks.com/blog/2017/10/06/accelerating-r-workflows-on-databricks.html. ;) _ From: Kunft, AndreasSent: Tuesday, November 28, 2017 3:11 AM Subject: AW: [Spark R]: dapply only works for very small datasets To: Felix Cheung , Thanks for the fast reply. I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well as on 4 nodes with the same specifications. When I shrink the data to around 100MB, it runs in about 1 hour for 1 core and about 6 min with 8 cores. I'm aware that the serDe takes time, but it seems there must be something else off considering these numbers. Von: Felix Cheung Gesendet: Montag, 27. November 2017 20:20 An: Kunft, Andreas; user@spark.apache.org Betreff: Re: [Spark R]: dapply only works for very small datasets What’s the number of executor and/or number of partitions you are working with? I’m afraid most of the problem is with the serialization deserialization overhead between JVM and R... From: Kunft, Andreas Sent: Monday, November 27, 2017 10:27:33 AM To: user@spark.apache.org Subject: [Spark R]: dapply only works for very small datasets Hello, I tried to execute some user defined functions with R using the airline arrival performance dataset. While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB, the `dapply` operator fails to finish even after ~4 hours. I'm using a function similar to the one from the documentation: df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema) I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was: "Avoid using dapply, gapply" So, do I miss some parameters or is there are general limitation? I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs. Best Andreas
Re: NLTK with Spark Streaming
Depending on your needs, its fairly easy to write a lightweight python wrapper around the Databricks spark-corenlp library: https://github.com/databricks/spark-corenlp Nicholas Szandor Hakobian, Ph.D. Staff Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Sun, Nov 26, 2017 at 8:19 AM, ashish rawatwrote: > Thanks Holden and Chetan. > > Holden - Have you tried it out, do you know the right way to do it? > Chetan - yes, if we use a Java NLP library, it should not be any issue in > integrating with spark streaming, but as I pointed out earlier, we want to > give flexibility to data scientists to use the language and library of > their choice, instead of restricting them to a library of our choice. > > On Sun, Nov 26, 2017 at 9:42 PM, Chetan Khatri < > chetan.opensou...@gmail.com> wrote: > >> But you can still use Stanford NLP library and distribute through spark >> right ! >> >> On Sun, Nov 26, 2017 at 3:31 PM, Holden Karau >> wrote: >> >>> So it’s certainly doable (it’s not super easy mind you), but until the >>> arrow udf release goes out it will be rather slow. >>> >>> On Sun, Nov 26, 2017 at 8:01 AM ashish rawat >>> wrote: >>> Hi, Has someone tried running NLTK (python) with Spark Streaming (scala)? I was wondering if this is a good idea and what are the right Spark operators to do this? The reason we want to try this combination is that we don't want to run our transformations in python (pyspark), but after the transformations, we need to run some natural language processing operations and we don't want to restrict the functions data scientists' can use to Spark natural language library. So, Spark streaming with NLTK looks like the right option, from the perspective of fast data processing and data science flexibility. Regards, Ashish >>> -- >>> Twitter: https://twitter.com/holdenkarau >>> >> >> >
Re: Writing custom Structured Streaming receiver
Cool. Thanks nezhazheng. I will give it a shot. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Structured Streaming: emitted record count
In structured streaming, the QueryProgressEvent does not seem to have the final emitted record count to the destination, I see only the number of input rows. I was trying to use the count (additional action after persisting the dataset), but I face the below exception when calling persist or count on the dataset before the query is started. I have a sample code below, please suggest how to get the query running and the final count. "Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;" Dataset data = transform(kafkaTopic, SPECIFIC_AVRO, kafkaStreamSet, UserSessionEventJoin.class, PropertyPageView.class, (Function& Serializable) (UserSessionEventJoin userSessionEventJoin) -> { UserEvent userEvent = userSessionEventJoin.getUserEvent(); if (userEvent != null && TYPE_PAGE_VIEW.equalsIgnoreCase(userEvent.getType())) { if (userEvent.getPayloadMap() != null) { return PAGE_TYPE_PROPERTY.equalsIgnoreCase( userEvent.getPayloadMap().get(PAGE_TYPE)); } } return false; } ); data.persist(StorageLevel.MEMORY_AND_DISK()); log.info("dataset persisted"); long emittedCount = data.count(); Map metricTags = new HashMap<>(); metricTags.put("source",kafkaTopic); metricTags.put("destination",sinkPath); DataMonitorMetric recordsWrittenMetric = dataMonitorUtils .buildDataMonitorMetricWithValue(null, System.currentTimeMillis(), "numOutputRows", metricTags, Aspect.EMITTED, emittedCount); dataMonitorUtils.sendMetric(recordsWrittenMetric); StreamingQuery streamingQuery = data.writeStream().outputMode("append") .format("parquet") .option("checkpointLocation", "file:///Users/asethurathnam/Downloads/parquet/checkpoint") .trigger(Trigger.ProcessingTime(1000, TimeUnit.MILLISECONDS)) .partitionBy("eventDate") .start("file:///Users/asethurathnam/Downloads/parquet/output-parquet"); data.unpersist(); log.info("dataset unpersisted"); streamingQuery.awaitTermination(); -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: does "Deep Learning Pipelines" scale out linearly?
Hello Andy, regarding your question, this will depend a lot on the specific task: - for tasks that are "easy" to distribute such as inference (scoring), hyper-parameter tuning or cross-validation, these tasks will take full advantage of the cluster and the performance should improve more or less linearly - for training the same model with multiple machines, and a distributed dataset, then you are currently better off with a dedicated solution such as TensorFlowOnSpark or dist-keras. We are working on addressing this issue in a future release. Also, we opened a mailing list dedicated to Deep Learning Pipelines, to which I will copy this answer. Feel free to answer there: https://groups.google.com/forum/#!forum/dl-pipelines-users/ Tim On November 22, 2017 at 10:02:59 AM, Andy Davidson (a...@santacruzintegration.com) wrote: > I am starting a new deep learning project currently we do all of our work on > a single machine using a combination of Keras and Tensor flow. > https://databricks.github.io/spark-deep-learning/site/index.html looks very > promising. Any idea how performance is likely to improve as I add machines > to my my cluster? > > Kind regards > > Andy > > > P.s. Is user@spark.apache.org the best place to ask questions about this > package? > > > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Data Frame. PreSorded partitions
I'm not sure other than retrieving from a hive table that is already sorted. This sounds cool though, would be interested to know this as well On Nov 28, 2017 10:40 AM, "Николай Ижиков"wrote: > Hello, guys! > > I work on implementation of custom DataSource for Spark Data Frame API and > have a question: > > If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort > data inside a partition in my data source. > > Do I have a built-in option to tell spark that data from each partition > already sorted? > > It seems that Spark can benefit from usage of already sorted partitions. > By using of distributed merge sort algorithm, for example. > > Does it make sense for you? > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark Data Frame. PreSorded partitions
Hello, guys! I work on implementation of custom DataSource for Spark Data Frame API and have a question: If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort data inside a partition in my data source. Do I have a built-in option to tell spark that data from each partition already sorted? It seems that Spark can benefit from usage of already sorted partitions. By using of distributed merge sort algorithm, for example. Does it make sense for you? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
AW: [Spark R]: dapply only works for very small datasets
Thanks for the fast reply. I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well as on 4 nodes with the same specifications. When I shrink the data to around 100MB, it runs in about 1 hour for 1 core and about 6 min with 8 cores. I'm aware that the serDe takes time, but it seems there must be something else off considering these numbers. Von: Felix CheungGesendet: Montag, 27. November 2017 20:20 An: Kunft, Andreas; user@spark.apache.org Betreff: Re: [Spark R]: dapply only works for very small datasets What's the number of executor and/or number of partitions you are working with? I'm afraid most of the problem is with the serialization deserialization overhead between JVM and R... From: Kunft, Andreas Sent: Monday, November 27, 2017 10:27:33 AM To: user@spark.apache.org Subject: [Spark R]: dapply only works for very small datasets Hello, I tried to execute some user defined functions with R using the airline arrival performance dataset. While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB, the `dapply` operator fails to finish even after ~4 hours. I'm using a function similar to the one from the documentation: df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema) I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was: "Avoid using dapply, gapply" So, do I miss some parameters or is there are general limitation? I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs. Best Andreas