Re: GraphX: New graph operator
I think it would be good to have more basic operators like union or difference, as long as they have an efficient distributed implementation and are plausibly useful. If they can be written in terms of the existing GraphX API, it would be best to put them into GraphOps to keep the core GraphX implementation small. The `mask` operation should actually be in GraphOps -- it's only in GraphImpl for historical reasons. On the other hand, `subgraph` needs to be in GraphImpl for performance: it accesses EdgeRDDImpl#filter(epred, vpred), which can't be a public EdgeRDD method because its semantics rely on an implementation detail (vertex replication). Ankur http://www.ankurdave.com/ On Mon, Jun 1, 2015 at 8:54 AM, Tarek Auel tarek.a...@gmail.com wrote: Hello, Someone proposed in a Jira issue to implement new graph operations. Sean Owen recommended to check first with the mailing list, if this is interesting or not. So I would like to know, if it is interesting for GraphX to implement the operators like: http://en.wikipedia.org/wiki/Graph_operations and/or http://techieme.in/complex-graph-operations/ If yes, should they be integrated into GraphImpl (like mask, subgraph etc.) or as external library? My feeling is that they are similar to mask. Because of consistency they should be part of the graph implementation itself. What do you guys think? I really would like to bring GraphX forward and help to implement some of these. Looking forward to hear your opinions Tarek
Re: [VOTE] Release Apache Spark 1.4.0 (RC3)
Hi everyone, I think there's a blocker on PySpark the when functions in python seems to be broken but the Scala API seems fine. Here's a snippet demonstrating that with Spark 1.4.0 RC3 : In [*1*]: df = sqlCtx.createDataFrame([(1, 1), (2, 2), (1, 2), (1, 2)], [key, value]) In [*2*]: from pyspark.sql import functions as F In [*8*]: df.select(df.key, F.when(df.key 1, 0).when(df.key == 0, 2).otherwise(1)).show() +---+-+ | key |CASE WHEN (key = 0) THEN 2 ELSE 1| +---+-+ | 1| 1| | 2| 1| | 1| 1| | 1| 1| +---+-+ When in Scala I get the expectes expression and behaviour : scala val df = sqlContext.createDataFrame(List((1, 1), (2, 2), (1, 2), (1, 2))).toDF(key, value) scala import org.apache.spark.sql.functions._ scala df.select(df(key), when(df(key) 1, 0).when(df(key) === 2, 2).otherwise(1)).show() +---+---+ |key|CASE WHEN (key 1) THEN 0 WHEN (key = 2) THEN 2 ELSE 1| +---+---+ | 1| 1| | 2| 0| | 1| 1| | 1| 1| +---+---+ I've opened the Jira (https://issues.apache.org/jira/browse/SPARK-8038) and fixed it here https://github.com/apache/spark/pull/6580 Regards, Olivier. Le mar. 2 juin 2015 à 07:34, Bobby Chowdary bobby.chowdar...@gmail.com a écrit : Hi Patrick, Thanks for clarifying. No issues with functionality. +1 (non-binding) Thanks Bobby On Mon, Jun 1, 2015 at 9:41 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Bobby, Those are generic warnings that the hadoop libraries throw. If you are using MapRFS they shouldn't matter since you are using the MapR client and not the default hadoop client. Do you have any issues with functionality... or was it just seeing the warnings that was the concern? Thanks for helping test! - Patrick On Mon, Jun 1, 2015 at 5:18 PM, Bobby Chowdary bobby.chowdar...@gmail.com wrote: Hive Context works on RC3 for Mapr after adding spark.sql.hive.metastore.sharedPrefixes as suggested in SPARK-7819. However, there still seems to be some other issues with native libraries, i get below warning WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable. I tried adding even after adding SPARK_LIBRARYPATH and --driver-library-path with no luck. Built on MacOSX and running CentOS 7 JDK1.6 and JDK 1.8 (tried both) make-distribution.sh --tgz --skip-java-test -Phive -Phive-0.13.1 -Pmapr4 -Pnetlib-lgpl -Phive-thriftserver. C On Mon, Jun 1, 2015 at 3:05 PM, Sean Owen so...@cloudera.com wrote: I get a bunch of failures in VersionSuite with build/test params -Pyarn -Phive -Phadoop-2.6: - success sanity check *** FAILED *** java.lang.RuntimeException: [download failed: org.jboss.netty#netty;3.2.2.Final!netty.jar(bundle), download failed: commons-net#commons-net;3.1!commons-net.jar] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:978) ... but maybe I missed the memo about how to build for Hive? do I still need another Hive profile? Other tests, signatures, etc look good. On Sat, May 30, 2015 at 12:40 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload
about Spark MLlib StandardScaler's Implementation
Hi, When I was trying to add test case for ML’s StandardScaler, I found MLlib’s StandardScaler’s output different from R with params(withMean false, withScale true) Because columns is divided by root-mean-square rather than standard deviation in R, the scale function. I’ m confused about Spark MLlib’s implementation. AnyBody can give me a hand ? thx -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/about-Spark-MLlib-StandardScaler-s-Implementation-tp12554.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: GraphX: New graph operator
Okay thanks for your feedback. What is the expected behavior of union? Like Union and/or union all of SQL? Union all would be more or less trivial if we just concatenate the vertices and edges (vertex Id conflicts have to be resolved). Should union look for duplicates on the actual attribute (VD) or just the vertex Id? If it compares the attribute it might be necessary to change the id of some vertices in order to resolve conflicts. Already a big thanks for your inputs ! On Mon 1 Jun 2015 at 11:55 pm Ankur Dave ankurd...@gmail.com wrote: I think it would be good to have more basic operators like union or difference, as long as they have an efficient distributed implementation and are plausibly useful. If they can be written in terms of the existing GraphX API, it would be best to put them into GraphOps to keep the core GraphX implementation small. The `mask` operation should actually be in GraphOps -- it's only in GraphImpl for historical reasons. On the other hand, `subgraph` needs to be in GraphImpl for performance: it accesses EdgeRDDImpl#filter(epred, vpred), which can't be a public EdgeRDD method because its semantics rely on an implementation detail (vertex replication). Ankur http://www.ankurdave.com/ On Mon, Jun 1, 2015 at 8:54 AM, Tarek Auel tarek.a...@gmail.com wrote: Hello, Someone proposed in a Jira issue to implement new graph operations. Sean Owen recommended to check first with the mailing list, if this is interesting or not. So I would like to know, if it is interesting for GraphX to implement the operators like: http://en.wikipedia.org/wiki/Graph_operations and/or http://techieme.in/complex-graph-operations/ If yes, should they be integrated into GraphImpl (like mask, subgraph etc.) or as external library? My feeling is that they are similar to mask. Because of consistency they should be part of the graph implementation itself. What do you guys think? I really would like to bring GraphX forward and help to implement some of these. Looking forward to hear your opinions Tarek
Re: CSV Support in SparkR
Hi Alek As Burak said, you can already use the spark-csv with SparkR in the 1.4 release. So right now I use it with something like this # Launch SparkR ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 df - read.df(sqlContext, ./nycflights13.csv, com.databricks.spark.csv, header=true) You can also pass in other options to the spark csv as arguments to `read.df`. Let us know if this works Thanks Shivaram On Tue, Jun 2, 2015 at 12:03 PM, Burak Yavuz brk...@gmail.com wrote: Hi, cc'ing Shivaram here, because he worked on this yesterday. If I'm not mistaken, you can use the following workflow: ```./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3``` and then ```df - read.df(sqlContext, /data, csv, header = true)``` Best, Burak On Tue, Jun 2, 2015 at 11:52 AM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Are there any intentions to provide first class support for CSV files as one of the loadable file types in SparkR? Data brick’s spark-csv API [1] has support for SQL, Python, and Java/Scala, and implements most of the arguments of R’s read.table API [2], but currently there is no way to load CSV data in SparkR (1.4.0) besides separating our headers from the data, loading into an RDD, splitting by our delimiter, and then converting to a SparkR Data Frame with a vector of the columns gathered from the header. Regards, Alek Eskilson [1] -- https://github.com/databricks/spark-csv [2] -- http://www.inside-r.org/r-doc/utils/read.table CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
DataFrame.withColumn very slow when used iteratively?
Hey, I'm seeing extreme slowness in withColumn when it's used in a loop. I'm running this code: for (int i = 0; i NUM_ITERATIONS ++i) { df = df.withColumn(col+i, new Column(new Literal(i, DataTypes.IntegerType))); } where df is initially a trivial dataframe. Here are the results of running with different values of NUM_ITERATIONS: iterations time 25 3s 50 11s 75 31s 100 76s 125 159s 150 283s When I update the DataFrame by manually copying/appending to the column array and using DataFrame.select, it runs in about half the time, but this is still untenable at any significant number of iterations. Any insight? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-withColumn-very-slow-when-used-iteratively-tp12562.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Possible space improvements to shuffle
The relevant JIRA that springs to mind is https://issues.apache.org/jira/browse/SPARK-2926 If an aggregator and ordering are both defined, then the map side of sort-based shuffle will sort based on the key ordering so that map-side spills can be efficiently merged. We do not currently do a sort-based merge on the reduce side; implementing this is a little tricky because it will require more map partitions' output to be buffered on the reduce side. I think that SPARK-2926 has some proposals of how to deal with this, including hierarchical merging of reduce outputs. RE: ExternalSorter#partitionedIterator, I don't think it's safe to do !ordering.isDefined !aggregator.isDefined. If an aggregator is defined but we don't have an ordering, then I don't think it makes sense to sort the keys based on their hashcodes or some default ordering, since hashcode collisions would lead to incorrect results for sort-based aggregation. On Tue, Jun 2, 2015 at 1:50 PM, John Carrino john.carr...@gmail.com wrote: One thing I have noticed with ExternalSorter is that if an ordering is not defined, it does the sort using only the partition_id, instead of (parition_id, hash). This means that on the reduce side you need to pull the entire dataset into memory before you can begin iterating over the results. I figure since we are doing a sort of the data anyway it doesn't seem more expensive to sort by (parition, hash). That way the reducer can do a merge and only has the hold in memory the data for a single int hashCode before it can combine then and start returning results form the iterator. Has this already been discussed? If so, can someone point me in the right direction to find out more? Thanks for any help! -jc p.s. I am using spark version 1.3.1. The code I am looking at below is from ExternalSorter#partitionedIterator. I think maybe !ordering.isDefined should also include !aggregator.isDefined if (spills.isEmpty partitionWriters == null) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(collection.destructiveSortedIterator(partitionComparator)) } else { // We do need to sort by both partition ID and key groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) }
Re: DataFrame.withColumn very slow when used iteratively?
We improved this in 1.4. Adding 100 columns took 4s on my laptop. https://issues.apache.org/jira/browse/SPARK-7276 Still not the fastest, but much faster. scala Seq((1, 2)).toDF(a, b) res6: org.apache.spark.sql.DataFrame = [a: int, b: int] scala scala val start = System.nanoTime start: Long = 1433274299441224000 scala for (i - 1 to 100) { | df = df.withColumn(n + i, org.apache.spark.sql.functions.lit(0)) | } scala val end = System.nanoTime end: Long = 1433274303250091000 scala scala println((end - start) / 1000 / 1000 / 1000) 3 On Tue, Jun 2, 2015 at 12:34 PM, zsampson zsamp...@palantir.com wrote: Hey, I'm seeing extreme slowness in withColumn when it's used in a loop. I'm running this code: for (int i = 0; i NUM_ITERATIONS ++i) { df = df.withColumn(col+i, new Column(new Literal(i, DataTypes.IntegerType))); } where df is initially a trivial dataframe. Here are the results of running with different values of NUM_ITERATIONS: iterations time 25 3s 50 11s 75 31s 100 76s 125 159s 150 283s When I update the DataFrame by manually copying/appending to the column array and using DataFrame.select, it runs in about half the time, but this is still untenable at any significant number of iterations. Any insight? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-withColumn-very-slow-when-used-iteratively-tp12562.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: CSV Support in SparkR
Ah, alright, cool. I’ll rebuild and let you know. Thanks again, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:57 PM To: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com Cc: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu, Burak Yavuz brk...@gmail.commailto:brk...@gmail.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: CSV Support in SparkR There was a bug in the SparkContext creation that I fixed yesterday. https://github.com/apache/spark/commit/6b44278ef7cd2a278dfa67e8393ef30775c72726https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_commit_6b44278ef7cd2a278dfa67e8393ef30775c72726d=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=kO95UBEkBrQwNCQwa2x0MOiUxhLQvBQ1B2q5EDG_bt4s=UjoHyjJhx1vf6fqNiq3P-MqcvN2FnssT16FJ8o98pF4e= If you build from master it should be fixed. Also I think we might have a rc4 which should have this Thanks Shivaram On Tue, Jun 2, 2015 at 12:56 PM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: Hey, that’s pretty convenient. Unfortunately, although the package seems to pull fine into the session, I’m getting class not found exceptions with: Caused by: org.apache.spark.SparkExcetion: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$anonfun$buildScan$1 Which smells like a path issue to me, and I made sure the ivy repo was part of my PATH, but functions like showDF() still fail with that error. Did I miss a setting, or should the package inclusion in the sparkR execution load that in? I’ve run df - read.df(sqlCtx, “./data.csv”, “com.databricks.spark.csv”, header=“true”, delimiter=“|”) showDF(df, 10) (my data is pipeline delimited, and the default SQL context is sqlCtx) Thanks, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:08 PM To: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Cc: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org, Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Subject: Re: CSV Support in SparkR Hi Alek As Burak said, you can already use the spark-csv with SparkR in the 1.4 release. So right now I use it with something like this # Launch SparkR ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 df - read.df(sqlContext, ./nycflights13.csv, com.databricks.spark.csv, header=true) You can also pass in other options to the spark csv as arguments to `read.df`. Let us know if this works Thanks Shivaram On Tue, Jun 2, 2015 at 12:03 PM, Burak Yavuz brk...@gmail.commailto:brk...@gmail.com wrote: Hi, cc'ing Shivaram here, because he worked on this yesterday. If I'm not mistaken, you can use the following workflow: ```./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3``` and then ```df - read.df(sqlContext, /data, csv, header = true)``` Best, Burak On Tue, Jun 2, 2015 at 11:52 AM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: Are there any intentions to provide first class support for CSV files as one of the loadable file types in SparkR? Data brick’s spark-csv API [1] has support for SQL, Python, and Java/Scala, and implements most of the arguments of R’s read.table API [2], but currently there is no way to load CSV data in SparkR (1.4.0) besides separating our headers from the data, loading into an RDD, splitting by our delimiter, and then converting to a SparkR Data Frame with a vector of the columns gathered from the header. Regards, Alek Eskilson [1] -- https://github.com/databricks/spark-csvhttps://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Dcsvd=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=mPtlFYdyx5Rp7pZr-bQ15QMIrq4qE26ECfJCzoMwYhIs=wT5PU54lVmR2R_o3GidPhDQD9kMMNVYotZEqCd4ASm4e= [2] --
Re: CSV Support in SparkR
Hey, that’s pretty convenient. Unfortunately, although the package seems to pull fine into the session, I’m getting class not found exceptions with: Caused by: org.apache.spark.SparkExcetion: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$anonfun$buildScan$1 Which smells like a path issue to me, and I made sure the ivy repo was part of my PATH, but functions like showDF() still fail with that error. Did I miss a setting, or should the package inclusion in the sparkR execution load that in? I’ve run df - read.df(sqlCtx, “./data.csv”, “com.databricks.spark.csv”, header=“true”, delimiter=“|”) showDF(df, 10) (my data is pipeline delimited, and the default SQL context is sqlCtx) Thanks, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:08 PM To: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Cc: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org, Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Subject: Re: CSV Support in SparkR Hi Alek As Burak said, you can already use the spark-csv with SparkR in the 1.4 release. So right now I use it with something like this # Launch SparkR ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 df - read.df(sqlContext, ./nycflights13.csv, com.databricks.spark.csv, header=true) You can also pass in other options to the spark csv as arguments to `read.df`. Let us know if this works Thanks Shivaram On Tue, Jun 2, 2015 at 12:03 PM, Burak Yavuz brk...@gmail.commailto:brk...@gmail.com wrote: Hi, cc'ing Shivaram here, because he worked on this yesterday. If I'm not mistaken, you can use the following workflow: ```./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3``` and then ```df - read.df(sqlContext, /data, csv, header = true)``` Best, Burak On Tue, Jun 2, 2015 at 11:52 AM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: Are there any intentions to provide first class support for CSV files as one of the loadable file types in SparkR? Data brick’s spark-csv API [1] has support for SQL, Python, and Java/Scala, and implements most of the arguments of R’s read.table API [2], but currently there is no way to load CSV data in SparkR (1.4.0) besides separating our headers from the data, loading into an RDD, splitting by our delimiter, and then converting to a SparkR Data Frame with a vector of the columns gathered from the header. Regards, Alek Eskilson [1] -- https://github.com/databricks/spark-csvhttps://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Dcsvd=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=mPtlFYdyx5Rp7pZr-bQ15QMIrq4qE26ECfJCzoMwYhIs=wT5PU54lVmR2R_o3GidPhDQD9kMMNVYotZEqCd4ASm4e= [2] -- http://www.inside-r.org/r-doc/utils/read.tablehttps://urldefense.proofpoint.com/v2/url?u=http-3A__www.inside-2Dr.org_r-2Ddoc_utils_read.tabled=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=mPtlFYdyx5Rp7pZr-bQ15QMIrq4qE26ECfJCzoMwYhIs=h87nnmV5D3soOFo5wasj1J34zbhvukHd1WcSitsjB6se= CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024tel:%28%2B1%29%20%28816%29221-1024.
Possible space improvements to shuffle
One thing I have noticed with ExternalSorter is that if an ordering is not defined, it does the sort using only the partition_id, instead of (parition_id, hash). This means that on the reduce side you need to pull the entire dataset into memory before you can begin iterating over the results. I figure since we are doing a sort of the data anyway it doesn't seem more expensive to sort by (parition, hash). That way the reducer can do a merge and only has the hold in memory the data for a single int hashCode before it can combine then and start returning results form the iterator. Has this already been discussed? If so, can someone point me in the right direction to find out more? Thanks for any help! -jc p.s. I am using spark version 1.3.1. The code I am looking at below is from ExternalSorter#partitionedIterator. I think maybe !ordering.isDefined should also include !aggregator.isDefined if (spills.isEmpty partitionWriters == null) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(collection.destructiveSortedIterator(partitionComparator)) } else { // We do need to sort by both partition ID and key groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) }
Re: DataFrame.withColumn very slow when used iteratively?
Would it be valuable to create a .withColumns([colName], [ColumnObject]) method that adds in bulk rather than iteratively? Alternatively effort might be better spent in making .withColumn() singular faster. On Tue, Jun 2, 2015 at 3:46 PM, Reynold Xin r...@databricks.com wrote: We improved this in 1.4. Adding 100 columns took 4s on my laptop. https://issues.apache.org/jira/browse/SPARK-7276 Still not the fastest, but much faster. scala Seq((1, 2)).toDF(a, b) res6: org.apache.spark.sql.DataFrame = [a: int, b: int] scala scala val start = System.nanoTime start: Long = 1433274299441224000 scala for (i - 1 to 100) { | df = df.withColumn(n + i, org.apache.spark.sql.functions.lit(0)) | } scala val end = System.nanoTime end: Long = 1433274303250091000 scala scala println((end - start) / 1000 / 1000 / 1000) 3 On Tue, Jun 2, 2015 at 12:34 PM, zsampson zsamp...@palantir.com wrote: Hey, I'm seeing extreme slowness in withColumn when it's used in a loop. I'm running this code: for (int i = 0; i NUM_ITERATIONS ++i) { df = df.withColumn(col+i, new Column(new Literal(i, DataTypes.IntegerType))); } where df is initially a trivial dataframe. Here are the results of running with different values of NUM_ITERATIONS: iterations time 25 3s 50 11s 75 31s 100 76s 125 159s 150 283s When I update the DataFrame by manually copying/appending to the column array and using DataFrame.select, it runs in about half the time, but this is still untenable at any significant number of iterations. Any insight? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-withColumn-very-slow-when-used-iteratively-tp12562.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Possible space improvements to shuffle
Yes, I think that bug is what I want. Thank you. So I guess the current reason is that we don't want to buffer up numMapper incoming streams. So we just iterate through each and transfer it over in full because that is more network efficient? I'm not sure I understand why you wouldn't want to sort on the composite (parition_id, hash). I think using the partitionKeyComparator should be ok, because the other case of merging with spilled files uses it and that works out ok. The aggregation I am doing basically has as many output rows as input rows so I am seeing a lot of memory pressure on the reduce side but it doesn't have the same ability to spill like map does. -jc On Tue, Jun 2, 2015 at 2:08 PM, Josh Rosen rosenvi...@gmail.com wrote: The relevant JIRA that springs to mind is https://issues.apache.org/jira/browse/SPARK-2926 If an aggregator and ordering are both defined, then the map side of sort-based shuffle will sort based on the key ordering so that map-side spills can be efficiently merged. We do not currently do a sort-based merge on the reduce side; implementing this is a little tricky because it will require more map partitions' output to be buffered on the reduce side. I think that SPARK-2926 has some proposals of how to deal with this, including hierarchical merging of reduce outputs. RE: ExternalSorter#partitionedIterator, I don't think it's safe to do !ordering.isDefined !aggregator.isDefined. If an aggregator is defined but we don't have an ordering, then I don't think it makes sense to sort the keys based on their hashcodes or some default ordering, since hashcode collisions would lead to incorrect results for sort-based aggregation. On Tue, Jun 2, 2015 at 1:50 PM, John Carrino john.carr...@gmail.com wrote: One thing I have noticed with ExternalSorter is that if an ordering is not defined, it does the sort using only the partition_id, instead of (parition_id, hash). This means that on the reduce side you need to pull the entire dataset into memory before you can begin iterating over the results. I figure since we are doing a sort of the data anyway it doesn't seem more expensive to sort by (parition, hash). That way the reducer can do a merge and only has the hold in memory the data for a single int hashCode before it can combine then and start returning results form the iterator. Has this already been discussed? If so, can someone point me in the right direction to find out more? Thanks for any help! -jc p.s. I am using spark version 1.3.1. The code I am looking at below is from ExternalSorter#partitionedIterator. I think maybe !ordering.isDefined should also include !aggregator.isDefined if (spills.isEmpty partitionWriters == null) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(collection.destructiveSortedIterator(partitionComparator)) } else { // We do need to sort by both partition ID and key groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) }
Re: CSV Support in SparkR
Seems to work great in the master build. It’s really good to have this functionality. Regards, Alek Eskilson From: Eskilson, Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com Date: Tuesday, June 2, 2015 at 2:59 PM To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Cc: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: CSV Support in SparkR Ah, alright, cool. I’ll rebuild and let you know. Thanks again, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:57 PM To: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com Cc: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu, Burak Yavuz brk...@gmail.commailto:brk...@gmail.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org Subject: Re: CSV Support in SparkR There was a bug in the SparkContext creation that I fixed yesterday. https://github.com/apache/spark/commit/6b44278ef7cd2a278dfa67e8393ef30775c72726https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_commit_6b44278ef7cd2a278dfa67e8393ef30775c72726d=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=kO95UBEkBrQwNCQwa2x0MOiUxhLQvBQ1B2q5EDG_bt4s=UjoHyjJhx1vf6fqNiq3P-MqcvN2FnssT16FJ8o98pF4e= If you build from master it should be fixed. Also I think we might have a rc4 which should have this Thanks Shivaram On Tue, Jun 2, 2015 at 12:56 PM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: Hey, that’s pretty convenient. Unfortunately, although the package seems to pull fine into the session, I’m getting class not found exceptions with: Caused by: org.apache.spark.SparkExcetion: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$anonfun$buildScan$1 Which smells like a path issue to me, and I made sure the ivy repo was part of my PATH, but functions like showDF() still fail with that error. Did I miss a setting, or should the package inclusion in the sparkR execution load that in? I’ve run df - read.df(sqlCtx, “./data.csv”, “com.databricks.spark.csv”, header=“true”, delimiter=“|”) showDF(df, 10) (my data is pipeline delimited, and the default SQL context is sqlCtx) Thanks, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:08 PM To: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Cc: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org, Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Subject: Re: CSV Support in SparkR Hi Alek As Burak said, you can already use the spark-csv with SparkR in the 1.4 release. So right now I use it with something like this # Launch SparkR ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 df - read.df(sqlContext, ./nycflights13.csv, com.databricks.spark.csv, header=true) You can also pass in other options to the spark csv as arguments to `read.df`. Let us know if this works Thanks Shivaram On Tue, Jun 2, 2015 at 12:03 PM, Burak Yavuz brk...@gmail.commailto:brk...@gmail.com wrote: Hi, cc'ing Shivaram here, because he worked on this yesterday. If I'm not mistaken, you can use the following workflow: ```./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3``` and then ```df - read.df(sqlContext, /data, csv, header = true)``` Best, Burak On Tue, Jun 2, 2015 at 11:52 AM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: Are there any intentions to provide first class support for CSV files as one of the loadable file types in SparkR? Data brick’s spark-csv API [1] has support for SQL, Python, and Java/Scala, and implements most of the arguments of R’s read.table API [2], but currently there is no way to load CSV data in SparkR (1.4.0) besides separating our headers from the data, loading into an RDD, splitting by our delimiter, and then converting to a SparkR Data Frame with a vector of the columns gathered from the header. Regards, Alek Eskilson [1] --
createDataframe from s3 results in error
I've run into an error when trying to create a dataframe. Here's the code: -- from pyspark import StorageLevel from pyspark.sql import Row table = 'blah' ssc = HiveContext(sc) data = sc.textFile('s3://bucket/some.tsv') def deserialize(s): p = s.strip().split('\t') p[-1] = float(p[-1]) return Row(normalized_page_sha1=p[0], name=p[1], phrase=p[2], created_at=p[3], layer_id=p[4], score=p[5]) blah = data.map(deserialize) df = sqlContext.inferSchema(blah) --- I've also tried s3n and using createDataFrame. Our setup is on EMR instances, using the setup script Amazon provides. After lots of debugging, I suspect there'll be a problem with this setup. What's weird is that if I run this on pyspark shell, and re-run the last line (inferSchema/createDataFrame), it actually works. We're getting warnings like this: http://pastebin.ca/3016476 Here's the actual error: http://www.pastebin.ca/3016473 Any help would be greatly appreciated. Thanks, Ignacio
Re: createDataframe from s3 results in error
Maybe an incompatible Hive package or Hive metastore? On Tue, Jun 2, 2015 at 3:25 PM, Ignacio Zendejas i...@node.io wrote: From RELEASE: Spark 1.3.1 built for Hadoop 2.4.0 Build flags: -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests -Pkinesis-asl -Pspark-ganglia-lgpl -Phadoop-provided -Phive -Phive-thriftserver And this stacktrace may be more useful: http://pastebin.ca/3016483 On Tue, Jun 2, 2015 at 3:13 PM, Ignacio Zendejas i...@node.io wrote: I've run into an error when trying to create a dataframe. Here's the code: -- from pyspark import StorageLevel from pyspark.sql import Row table = 'blah' ssc = HiveContext(sc) data = sc.textFile('s3://bucket/some.tsv') def deserialize(s): p = s.strip().split('\t') p[-1] = float(p[-1]) return Row(normalized_page_sha1=p[0], name=p[1], phrase=p[2], created_at=p[3], layer_id=p[4], score=p[5]) blah = data.map(deserialize) df = sqlContext.inferSchema(blah) --- I've also tried s3n and using createDataFrame. Our setup is on EMR instances, using the setup script Amazon provides. After lots of debugging, I suspect there'll be a problem with this setup. What's weird is that if I run this on pyspark shell, and re-run the last line (inferSchema/createDataFrame), it actually works. We're getting warnings like this: http://pastebin.ca/3016476 Here's the actual error: http://www.pastebin.ca/3016473 Any help would be greatly appreciated. Thanks, Ignacio
Re: [SQL] Write parquet files under partition directories?
Almost all dataframe stuff are tracked by this umbrella ticket: https://issues.apache.org/jira/browse/SPARK-6116 For the reader/writer interface, it's here: https://issues.apache.org/jira/browse/SPARK-7654 https://github.com/apache/spark/pull/6175 On Tue, Jun 2, 2015 at 3:57 PM, Matt Cheah mch...@palantir.com wrote: Excellent! Where can I find the code, pull request, and Spark ticket where this was introduced? Thanks, -Matt Cheah From: Reynold Xin r...@databricks.com Date: Monday, June 1, 2015 at 10:25 PM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: [SQL] Write parquet files under partition directories? There will be in 1.4. df.write.partitionBy(year, month, day).parquet(/path/to/output) On Mon, Jun 1, 2015 at 10:21 PM, Matt Cheah mch...@palantir.com wrote: Hi there, I noticed in the latest Spark SQL programming guide https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_sql-2Dprogramming-2Dguide.htmld=BQMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=_7T9n01KFlQS8djMTP3ylblUaOYNr68mj286s8zIdQ8s=VQxAw6mG9yopDs37lNi7H_CnYiFQumqDAn9A8881Xyce=, there is support for optimized reading of partitioned Parquet files that have a particular directory structure (year=1/month=10/day=3, for example). However, I see no analogous way to write DataFrames as Parquet files with similar directory structures based on user-provided partitioning. Generally, is it possible to write DataFrames as partitioned Parquet files that downstream partition discovery can take advantage of later? I considered extending the Parquet output format, but it looks like ParquetTableOperations.scala has fixed the output format to AppendingParquetOutputFormat. Also, I was wondering if it would be valuable to contribute writing Parquet in partition directories as a PR. Thanks, -Matt Cheah
Re: createDataframe from s3 results in error
From RELEASE: Spark 1.3.1 built for Hadoop 2.4.0 Build flags: -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests -Pkinesis-asl -Pspark-ganglia-lgpl -Phadoop-provided -Phive -Phive-thriftserver And this stacktrace may be more useful: http://pastebin.ca/3016483 On Tue, Jun 2, 2015 at 3:13 PM, Ignacio Zendejas i...@node.io wrote: I've run into an error when trying to create a dataframe. Here's the code: -- from pyspark import StorageLevel from pyspark.sql import Row table = 'blah' ssc = HiveContext(sc) data = sc.textFile('s3://bucket/some.tsv') def deserialize(s): p = s.strip().split('\t') p[-1] = float(p[-1]) return Row(normalized_page_sha1=p[0], name=p[1], phrase=p[2], created_at=p[3], layer_id=p[4], score=p[5]) blah = data.map(deserialize) df = sqlContext.inferSchema(blah) --- I've also tried s3n and using createDataFrame. Our setup is on EMR instances, using the setup script Amazon provides. After lots of debugging, I suspect there'll be a problem with this setup. What's weird is that if I run this on pyspark shell, and re-run the last line (inferSchema/createDataFrame), it actually works. We're getting warnings like this: http://pastebin.ca/3016476 Here's the actual error: http://www.pastebin.ca/3016473 Any help would be greatly appreciated. Thanks, Ignacio
Re: CSV Support in SparkR
Thanks for testing. We should probably include a section for this in the SparkR programming guide given how popular CSV files are in R. Feel free to open a PR for that if you get a chance. Shivaram On Tue, Jun 2, 2015 at 2:20 PM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Seems to work great in the master build. It’s really good to have this functionality. Regards, Alek Eskilson From: Eskilson, Aleksander Eskilson alek.eskil...@cerner.com Date: Tuesday, June 2, 2015 at 2:59 PM To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Cc: Burak Yavuz brk...@gmail.com, dev@spark.apache.org dev@spark.apache.org Subject: Re: CSV Support in SparkR Ah, alright, cool. I’ll rebuild and let you know. Thanks again, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:57 PM To: Aleksander Eskilson alek.eskil...@cerner.com Cc: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu, Burak Yavuz brk...@gmail.com, dev@spark.apache.org dev@spark.apache.org Subject: Re: CSV Support in SparkR There was a bug in the SparkContext creation that I fixed yesterday. https://github.com/apache/spark/commit/6b44278ef7cd2a278dfa67e8393ef30775c72726 https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_commit_6b44278ef7cd2a278dfa67e8393ef30775c72726d=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=kO95UBEkBrQwNCQwa2x0MOiUxhLQvBQ1B2q5EDG_bt4s=UjoHyjJhx1vf6fqNiq3P-MqcvN2FnssT16FJ8o98pF4e= If you build from master it should be fixed. Also I think we might have a rc4 which should have this Thanks Shivaram On Tue, Jun 2, 2015 at 12:56 PM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Hey, that’s pretty convenient. Unfortunately, although the package seems to pull fine into the session, I’m getting class not found exceptions with: Caused by: org.apache.spark.SparkExcetion: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$anonfun$buildScan$1 Which smells like a path issue to me, and I made sure the ivy repo was part of my PATH, but functions like showDF() still fail with that error. Did I miss a setting, or should the package inclusion in the sparkR execution load that in? I’ve run df - read.df(sqlCtx, “./data.csv”, “com.databricks.spark.csv”, header=“true”, delimiter=“|”) showDF(df, 10) (my data is pipeline delimited, and the default SQL context is sqlCtx) Thanks, Alek From: Shivaram Venkataraman shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Date: Tuesday, June 2, 2015 at 2:08 PM To: Burak Yavuz brk...@gmail.com Cc: Aleksander Eskilson alek.eskil...@cerner.com, dev@spark.apache.org dev@spark.apache.org, Shivaram Venkataraman shiva...@eecs.berkeley.edu Subject: Re: CSV Support in SparkR Hi Alek As Burak said, you can already use the spark-csv with SparkR in the 1.4 release. So right now I use it with something like this # Launch SparkR ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 df - read.df(sqlContext, ./nycflights13.csv, com.databricks.spark.csv, header=true) You can also pass in other options to the spark csv as arguments to `read.df`. Let us know if this works Thanks Shivaram On Tue, Jun 2, 2015 at 12:03 PM, Burak Yavuz brk...@gmail.com wrote: Hi, cc'ing Shivaram here, because he worked on this yesterday. If I'm not mistaken, you can use the following workflow: ```./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3``` and then ```df - read.df(sqlContext, /data, csv, header = true)``` Best, Burak On Tue, Jun 2, 2015 at 11:52 AM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Are there any intentions to provide first class support for CSV files as one of the loadable file types in SparkR? Data brick’s spark-csv API [1] has support for SQL, Python, and Java/Scala, and implements most of the arguments of R’s read.table API [2], but currently there is no way to load CSV data in SparkR (1.4.0) besides separating our headers from the data, loading into an RDD, splitting by our delimiter, and then converting to a SparkR Data Frame with a vector of the columns gathered from the header. Regards, Alek Eskilson [1] -- https://github.com/databricks/spark-csv https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Dcsvd=AwMFaQc=NRtzTzKNaCCmhN_9N2YJR-XrNU1huIgYP99yDsEzaJor=0vZw1rBdgaYvDJYLyKglbrax9kvQfRPdzxLUyWSyxPMm=mPtlFYdyx5Rp7pZr-bQ15QMIrq4qE26ECfJCzoMwYhIs=wT5PU54lVmR2R_o3GidPhDQD9kMMNVYotZEqCd4ASm4e= [2] -- http://www.inside-r.org/r-doc/utils/read.table
Re: createDataframe from s3 results in error
What version of Spark is this? On Tue, Jun 2, 2015 at 3:13 PM, Ignacio Zendejas i...@node.io wrote: I've run into an error when trying to create a dataframe. Here's the code: -- from pyspark import StorageLevel from pyspark.sql import Row table = 'blah' ssc = HiveContext(sc) data = sc.textFile('s3://bucket/some.tsv') def deserialize(s): p = s.strip().split('\t') p[-1] = float(p[-1]) return Row(normalized_page_sha1=p[0], name=p[1], phrase=p[2], created_at=p[3], layer_id=p[4], score=p[5]) blah = data.map(deserialize) df = sqlContext.inferSchema(blah) --- I've also tried s3n and using createDataFrame. Our setup is on EMR instances, using the setup script Amazon provides. After lots of debugging, I suspect there'll be a problem with this setup. What's weird is that if I run this on pyspark shell, and re-run the last line (inferSchema/createDataFrame), it actually works. We're getting warnings like this: http://pastebin.ca/3016476 Here's the actual error: http://www.pastebin.ca/3016473 Any help would be greatly appreciated. Thanks, Ignacio
Re: [SQL] Write parquet files under partition directories?
Excellent! Where can I find the code, pull request, and Spark ticket where this was introduced? Thanks, -Matt Cheah From: Reynold Xin r...@databricks.com Date: Monday, June 1, 2015 at 10:25 PM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: [SQL] Write parquet files under partition directories? There will be in 1.4. df.write.partitionBy(year, month, day).parquet(/path/to/output) On Mon, Jun 1, 2015 at 10:21 PM, Matt Cheah mch...@palantir.com wrote: Hi there, I noticed in the latest Spark SQL programming guide https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_la test_sql-2Dprogramming-2Dguide.htmld=BQMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBr Z4tFb6oOnmz8r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=_7T9n01KFlQS8djMT P3ylblUaOYNr68mj286s8zIdQ8s=VQxAw6mG9yopDs37lNi7H_CnYiFQumqDAn9A8881Xyce= , there is support for optimized reading of partitioned Parquet files that have a particular directory structure (year=1/month=10/day=3, for example). However, I see no analogous way to write DataFrames as Parquet files with similar directory structures based on user-provided partitioning. Generally, is it possible to write DataFrames as partitioned Parquet files that downstream partition discovery can take advantage of later? I considered extending the Parquet output format, but it looks like ParquetTableOperations.scala has fixed the output format to AppendingParquetOutputFormat. Also, I was wondering if it would be valuable to contribute writing Parquet in partition directories as a PR. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
[RESULT] [VOTE] Release Apache Spark 1.4.0 (RC3)
This vote is cancelled in favor of RC4. Thanks everyone for the thorough testing of this RC. We are really close, but there were a few blockers found. I've cut a new RC to incorporate those issues. The following patches were merged during the RC3 testing period: (blockers) 4940630 [SPARK-8020] [SQL] Spark SQL conf in spark-defaults.conf make metadataHive get constructed too early 6b0f615 [SPARK-8038] [SQL] [PYSPARK] fix Column.when() and otherwise() 78a6723 [SPARK-7978] [SQL] [PYSPARK] DecimalType should not be singleton (other fixes) 9d6475b [SPARK-6917] [SQL] DecimalType is not read back when non-native type exists 97d4cd0 [SPARK-8049] [MLLIB] drop tmp col from OneVsRest output cbaf595 [SPARK-8014] [SQL] Avoid premature metadata discovery when writing a HadoopFsRelation with a save mode other than Append fa292dc [SPARK-8015] [FLUME] Remove Guava dependency from flume-sink. f71a09d [SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation 292ee1a [SPARK-8021] [SQL] [PYSPARK] make Python read/write API consistent with Scala 87941ff [SPARK-8023][SQL] Add deterministic attribute to Expression to avoid collapsing nondeterministic projects. e6d5895 [SPARK-7965] [SPARK-7972] [SQL] Handle expressions containing multiple window expressions and make parser match window frames in case insensitive way 8ac2376 [SPARK-8026][SQL] Add Column.alias to Scala/Java DataFrame API efc0e05 [SPARK-7982][SQL] DataFrame.stat.crosstab should use 0 instead of null for pairs that don't appear cbfb682a [SPARK-8028] [SPARKR] Use addJar instead of setJars in SparkR a7c8b00 [SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors a76c2e1 [SPARK-7899] [PYSPARK] Fix Python 3 pyspark/sql/types module conflict f1d4e7e [SPARK-7227] [SPARKR] Support fillna / dropna in R DataFrame. 01f38f7 [SPARK-7979] Enforce structural type checker. 2c45009 [SPARK-7459] [MLLIB] ElementwiseProduct Java example 8938a74 [SPARK-7962] [MESOS] Fix master url parsing in rest submission client. 1513cff [SPARK-7957] Preserve partitioning when using randomSplit 9a88be1 [SPARK-6013] [ML] Add more Python ML examples for spark.ml 2bd4460 [SPARK-7954] [SPARKR] Create SparkContext in sparkRSQL init On Fri, May 29, 2015 at 4:40 PM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[VOTE] Release Apache Spark 1.4.0 (RC4)
Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit 22596c5): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h= 22596c534a38cfdda91aef18aa9037ab101e4251 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc4-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-/ [published as version: 1.4.0-rc4] https://repository.apache.org/content/repositories/orgapachespark-1112/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc4-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Saturday, June 06, at 05:00 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC3 == In addition to may smaller fixes, three blocker issues were fixed: 4940630 [SPARK-8020] [SQL] Spark SQL conf in spark-defaults.conf make metadataHive get constructed too early 6b0f615 [SPARK-8038] [SQL] [PYSPARK] fix Column.when() and otherwise() 78a6723 [SPARK-7978] [SQL] [PYSPARK] DecimalType should not be singleton == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: DataFrame.withColumn very slow when used iteratively?
.select itself is the bulk add right? On Tue, Jun 2, 2015 at 5:32 PM, Andrew Ash and...@andrewash.com wrote: Would it be valuable to create a .withColumns([colName], [ColumnObject]) method that adds in bulk rather than iteratively? Alternatively effort might be better spent in making .withColumn() singular faster. On Tue, Jun 2, 2015 at 3:46 PM, Reynold Xin r...@databricks.com wrote: We improved this in 1.4. Adding 100 columns took 4s on my laptop. https://issues.apache.org/jira/browse/SPARK-7276 Still not the fastest, but much faster. scala Seq((1, 2)).toDF(a, b) res6: org.apache.spark.sql.DataFrame = [a: int, b: int] scala scala val start = System.nanoTime start: Long = 1433274299441224000 scala for (i - 1 to 100) { | df = df.withColumn(n + i, org.apache.spark.sql.functions.lit(0)) | } scala val end = System.nanoTime end: Long = 1433274303250091000 scala scala println((end - start) / 1000 / 1000 / 1000) 3 On Tue, Jun 2, 2015 at 12:34 PM, zsampson zsamp...@palantir.com wrote: Hey, I'm seeing extreme slowness in withColumn when it's used in a loop. I'm running this code: for (int i = 0; i NUM_ITERATIONS ++i) { df = df.withColumn(col+i, new Column(new Literal(i, DataTypes.IntegerType))); } where df is initially a trivial dataframe. Here are the results of running with different values of NUM_ITERATIONS: iterations time 25 3s 50 11s 75 31s 100 76s 125 159s 150 283s When I update the DataFrame by manually copying/appending to the column array and using DataFrame.select, it runs in about half the time, but this is still untenable at any significant number of iterations. Any insight? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-withColumn-very-slow-when-used-iteratively-tp12562.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Unit tests can generate spurious shutdown messages
Can you submit a pull request for it? Thanks. On Tue, Jun 2, 2015 at 4:25 AM, Mick Davies michael.belldav...@gmail.com wrote: If I write unit tests that indirectly initialize org.apache.spark.util.Utils, for example use sql types, but produce no logging, I get the following unpleasant stack trace in my test output. This caused by the the Utils class adding a shutdown hook which logs the message logDebug(Shutdown hook called). We are using log4j 2 for logging and if there has been no logging before this point then the static initialization of log4j 2 tries to add a shutdown hook itself but can't because JVM is already in shutdown. Its only slightly annoying but could be easily 'fixed' by adding a line like: logDebug(Adding shutdown hook) to Utils before adding the shutdown hook, so ensuring logging always initialized. I am happy to make this change, unless there is a better approach or considered too trivial. ERROR StatusLogger catching java.lang.IllegalStateException: Shutdown in progress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at org.apache.logging.log4j.core.util.DefaultShutdownCallbackRegistry.addShutdownHook(DefaultShutdownCallbackRegistry.java:136) at org.apache.logging.log4j.core.util.DefaultShutdownCallbackRegistry.start(DefaultShutdownCallbackRegistry.java:125) at org.apache.logging.log4j.core.impl.Log4jContextFactory.initializeShutdownCallbackRegistry(Log4jContextFactory.java:123) at org.apache.logging.log4j.core.impl.Log4jContextFactory.init(Log4jContextFactory.java:89) at org.apache.logging.log4j.core.impl.Log4jContextFactory.init(Log4jContextFactory.java:54) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at java.lang.Class.newInstance(Class.java:438) at org.apache.logging.log4j.LogManager.clinit(LogManager.java:96) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:102) at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:43) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:42) at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:29) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285) at org.apache.spark.Logging$class.log(Logging.scala:52) at org.apache.spark.util.Utils$.log(Utils.scala:62) at org.apache.spark.Logging$class.initializeLogging(Logging.scala:138) at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107) at org.apache.spark.Logging$class.log(Logging.scala:51) at org.apache.spark.util.Utils$.log(Utils.scala:62) at org.apache.spark.Logging$class.logDebug(Logging.scala:63) at org.apache.spark.util.Utils$.logDebug(Utils.scala:62) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:178) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177) -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Unit-tests-can-generate-spurious-shutdown-messages-tp12557.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [VOTE] Release Apache Spark 1.4.0 (RC4)
He all - a tiny nit from the last e-mail. The tag is v1.4.0-rc4. The exact commit and all other information is correct. (thanks Shivaram who pointed this out). On Tue, Jun 2, 2015 at 8:53 PM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit 22596c5): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h= 22596c534a38cfdda91aef18aa9037ab101e4251 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc4-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-/ [published as version: 1.4.0-rc4] https://repository.apache.org/content/repositories/orgapachespark-1112/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc4-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Saturday, June 06, at 05:00 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC3 == In addition to may smaller fixes, three blocker issues were fixed: 4940630 [SPARK-8020] [SQL] Spark SQL conf in spark-defaults.conf make metadataHive get constructed too early 6b0f615 [SPARK-8038] [SQL] [PYSPARK] fix Column.when() and otherwise() 78a6723 [SPARK-7978] [SQL] [PYSPARK] DecimalType should not be singleton == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Unit tests can generate spurious shutdown messages
If I write unit tests that indirectly initialize org.apache.spark.util.Utils, for example use sql types, but produce no logging, I get the following unpleasant stack trace in my test output. This caused by the the Utils class adding a shutdown hook which logs the message logDebug(Shutdown hook called). We are using log4j 2 for logging and if there has been no logging before this point then the static initialization of log4j 2 tries to add a shutdown hook itself but can't because JVM is already in shutdown. Its only slightly annoying but could be easily 'fixed' by adding a line like: logDebug(Adding shutdown hook) to Utils before adding the shutdown hook, so ensuring logging always initialized. I am happy to make this change, unless there is a better approach or considered too trivial. ERROR StatusLogger catching java.lang.IllegalStateException: Shutdown in progress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at org.apache.logging.log4j.core.util.DefaultShutdownCallbackRegistry.addShutdownHook(DefaultShutdownCallbackRegistry.java:136) at org.apache.logging.log4j.core.util.DefaultShutdownCallbackRegistry.start(DefaultShutdownCallbackRegistry.java:125) at org.apache.logging.log4j.core.impl.Log4jContextFactory.initializeShutdownCallbackRegistry(Log4jContextFactory.java:123) at org.apache.logging.log4j.core.impl.Log4jContextFactory.init(Log4jContextFactory.java:89) at org.apache.logging.log4j.core.impl.Log4jContextFactory.init(Log4jContextFactory.java:54) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at java.lang.Class.newInstance(Class.java:438) at org.apache.logging.log4j.LogManager.clinit(LogManager.java:96) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:102) at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:43) at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:42) at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:29) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285) at org.apache.spark.Logging$class.log(Logging.scala:52) at org.apache.spark.util.Utils$.log(Utils.scala:62) at org.apache.spark.Logging$class.initializeLogging(Logging.scala:138) at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107) at org.apache.spark.Logging$class.log(Logging.scala:51) at org.apache.spark.util.Utils$.log(Utils.scala:62) at org.apache.spark.Logging$class.logDebug(Logging.scala:63) at org.apache.spark.util.Utils$.logDebug(Utils.scala:62) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:178) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177) -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Unit-tests-can-generate-spurious-shutdown-messages-tp12557.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org