spark git commit: [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3.
Repository: spark Updated Branches: refs/heads/branch-1.6 1ac830aca -> ccc7fa357 [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3. ## What changes were proposed in this pull request? - Adds 1.6.2 and 1.6.3 as supported Spark versions within the bundled spark-ec2 script. - Makes the default Spark version 1.6.3 to keep in sync with the upcoming release. - Does not touch the newer spark-ec2 scripts in the separate amplabs repository. ## How was this patch tested? - Manual script execution: export AWS_SECRET_ACCESS_KEY=_snip_ export AWS_ACCESS_KEY_ID=_snip_ $SPARK_HOME/ec2/spark-ec2 \ --key-pair=_snip_ \ --identity-file=_snip_ \ --region=us-east-1 \ --vpc-id=_snip_ \ --slaves=1 \ --instance-type=t1.micro \ --spark-version=1.6.2 \ --hadoop-major-version=yarn \ launch test-cluster - Result: Successful creation of a 1.6.2-based Spark cluster. This contribution is my original work and I license the work to the project under the project's open source license. Author: Brian Uri Closes #13947 from briuri/branch-1.6-bug-spark-16257. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc7fa35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc7fa35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc7fa35 Branch: refs/heads/branch-1.6 Commit: ccc7fa357099e0f621cfc02448ba20d3f6fabc14 Parents: 1ac830a Author: Brian Uri Authored: Thu Jun 30 07:52:28 2016 +0100 Committer: Sean Owen Committed: Thu Jun 30 07:52:28 2016 +0100 -- ec2/spark_ec2.py | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ccc7fa35/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 76c09f0..b28b4c5 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -51,7 +51,7 @@ else: raw_input = input xrange = range -SPARK_EC2_VERSION = "1.6.1" +SPARK_EC2_VERSION = "1.6.3" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) VALID_SPARK_VERSIONS = set([ @@ -77,6 +77,8 @@ VALID_SPARK_VERSIONS = set([ "1.5.2", "1.6.0", "1.6.1", +"1.6.2", +"1.6.3", ]) SPARK_TACHYON_MAP = { @@ -96,6 +98,8 @@ SPARK_TACHYON_MAP = { "1.5.2": "0.7.1", "1.6.0": "0.8.2", "1.6.1": "0.8.2", +"1.6.2": "0.8.2", +"1.6.3": "0.8.2", } DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION @@ -103,7 +107,7 @@ DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"; # Default location to get the spark-ec2 scripts (and ami-list) from DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2"; -DEFAULT_SPARK_EC2_BRANCH = "branch-1.5" +DEFAULT_SPARK_EC2_BRANCH = "branch-1.6" def setup_external_libs(libs): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/branch-2.0 3134f116a -> c8a7c2305 [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide Author: Tathagata Das Closes #13978 from tdas/SPARK-16256-1. (cherry picked from commit 2c3d96134dcc0428983eea087db7e91072215aea) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8a7c230 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8a7c230 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8a7c230 Branch: refs/heads/branch-2.0 Commit: c8a7c23054209db5474d96de2a7e2d8a6f8cc0da Parents: 3134f11 Author: Tathagata Das Authored: Wed Jun 29 23:38:19 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:38:35 2016 -0700 -- docs/structured-streaming-programming-guide.md | 44 +++-- 1 file changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8a7c230/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9ed06be..5932566 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -459,7 +459,7 @@ val csvDF = spark .readStream .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory")// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory");// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory");// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -513,7 +513,7 @@ csvDF = spark \ .readStream() \ .option("sep", ";") \ .schema(userSchema) \ -.csv("/path/to/directory")# Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")# Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -522,10 +522,10 @@ csvDF = spark \ These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document. ## Operations on streaming DataFrames/Datasets -You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Letâs take a look at a few example operations that you can use. +You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Letâs take a look at a few example operations that you can use. ### Basic Operations - Selection, Projection, Aggregation -Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section. +Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section. @@ -618,7 +618,7 @@ df.groupBy("type").count() ### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. +Aggregations over a sliding event-time window are straightforward with Structured Streaming. Th
spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/master dedbceec1 -> 2c3d96134 [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide Author: Tathagata Das Closes #13978 from tdas/SPARK-16256-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c3d9613 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c3d9613 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c3d9613 Branch: refs/heads/master Commit: 2c3d96134dcc0428983eea087db7e91072215aea Parents: dedbcee Author: Tathagata Das Authored: Wed Jun 29 23:38:19 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:38:19 2016 -0700 -- docs/structured-streaming-programming-guide.md | 44 +++-- 1 file changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c3d9613/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9ed06be..5932566 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -459,7 +459,7 @@ val csvDF = spark .readStream .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory")// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the parquet files -.csv("/path/to/directory");// Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory");// Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -513,7 +513,7 @@ csvDF = spark \ .readStream() \ .option("sep", ";") \ .schema(userSchema) \ -.csv("/path/to/directory")# Equivalent to format("cv").load("/path/to/directory") +.csv("/path/to/directory")# Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} @@ -522,10 +522,10 @@ csvDF = spark \ These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document. ## Operations on streaming DataFrames/Datasets -You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Letâs take a look at a few example operations that you can use. +You can apply all kinds of operations on streaming DataFrames/Datasets - ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming guide](sql-programming-guide.html) for more details. Letâs take a look at a few example operations that you can use. ### Basic Operations - Selection, Projection, Aggregation -Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section. +Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section. @@ -618,7 +618,7 @@ df.groupBy("type").count() ### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of, window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. +Aggregations over a sliding event-time window are straightforward with Structured Streaming. The key idea to understand about window-based aggregations are very similar to grouped aggregations. In a gro
[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
Repository: spark Updated Branches: refs/heads/branch-2.0 a54852350 -> 3134f116a http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java new file mode 100644 index 000..aba45f5 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final Map kafkaParams = new HashMap(); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe.apply(topics, kafkaParams, offsets); +final ConsumerStrategy sub1 = + Subscribe.apply(sTopics, sKafkaParams, sOffsets); +final ConsumerStrategy sub2 = + Subscribe.apply(sTopics, sKafkaParams); +final ConsumerStrategy sub3 = + Subscribe.create(topics, kafkaParams, offsets); +final ConsumerStrategy sub4 = + Subscribe.create(topics, kafkaParams); + +Assert.assertEquals( + sub1.executorKafkaParams().get("bootstrap.servers"), + sub3.executorKafkaParams().get("bootstrap.servers")); + +final ConsumerStrategy asn0 = + Assign.apply(parts, kafkaParams, offsets); +final ConsumerStrategy asn1 = + Assign.apply(sParts, sKafkaParams, sOffsets); +final ConsumerStrategy asn2 = + Assign.apply(sParts, sKafkaParams); +final ConsumerStrategy asn3 = + Assign.create(parts, kafkaParams, offsets); +final ConsumerStrategy asn4 = + Assign.create(parts, kafkaParams); + +Assert.assertEquals( + asn1.executorKafkaParams().get("bootstrap.servers"), + asn3.executorKafkaParams().get("bootstrap.servers")); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java new file mode 100644 index 000..e57ede7 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you
[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API ## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #11863 from koeninger/kafka-0.9. (cherry picked from commit dedbceec1ef33ccd88101016de969a1ef3e3e142) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134f116 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134f116 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134f116 Branch: refs/heads/branch-2.0 Commit: 3134f116a3565c3a299fa2e7094acd7304d64280 Parents: a548523 Author: cody koeninger Authored: Wed Jun 29 23:21:03 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:21:15 2016 -0700 -- external/kafka-0-10-assembly/pom.xml| 176 ++ external/kafka-0-10/pom.xml | 98 +++ .../kafka010/CachedKafkaConsumer.scala | 189 ++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ++ .../kafka010/DirectKafkaInputDStream.scala | 318 ++ .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++ .../streaming/kafka010/KafkaRDDPartition.scala | 45 ++ .../streaming/kafka010/KafkaTestUtils.scala | 277 + .../spark/streaming/kafka010/KafkaUtils.scala | 175 ++ .../streaming/kafka010/LocationStrategy.scala | 77 +++ .../spark/streaming/kafka010/OffsetRange.scala | 153 + .../spark/streaming/kafka010/package-info.java | 21 + .../spark/streaming/kafka010/package.scala | 23 + .../kafka010/JavaConsumerStrategySuite.java | 84 +++ .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 .../kafka010/JavaLocationStrategySuite.java | 58 ++ .../src/test/resources/log4j.properties | 28 + .../kafka010/DirectKafkaStreamSuite.scala | 612 +++ .../streaming/kafka010/KafkaRDDSuite.scala | 169 + pom.xml | 2 + project/SparkBuild.scala| 12 +- 22 files changed, 3359 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10-assembly/pom.xml -- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml new file mode 100644 index 000..f2468d1 --- /dev/null +++ b/external/kafka-0-10-assembly/pom.xml @@ -0,0 +1,176 @@ + + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.0.0-SNAPSHOT +../../pom.xml + + + org.apache.spark + spark-streaming-kafka-0-10-assembly_2.11 + jar + Spark Integration for Kafka 0.10 Assembly + http://spark.apache.org/ + + +streaming-kafka-0-10-assembly + + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + commons-codec + commons-codec + provided + + + commons-lang + commons-lang + provided + + + com.google.protobuf + protobuf-java + provided + + + net.jpountz.lz4 + lz4 + provided + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.avro + avro-mapred + ${avro.mapred.classifier} + provided + + + org.apache.curator + curator-recipes + provided + + + org.apache.zookeeper + zookeeper + provided + + + log4j + log4j + provided + + + net.java.dev.jets3t + jets3t + provided + + + org.scala-lang + scala-library + provided + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + provided + + + org.xerial.snappy + snappy-java + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + +false + + +*:* + + + + +*:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA +
[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
Repository: spark Updated Branches: refs/heads/master bde1d6a61 -> dedbceec1 http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java new file mode 100644 index 000..aba45f5 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final Map kafkaParams = new HashMap(); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe.apply(topics, kafkaParams, offsets); +final ConsumerStrategy sub1 = + Subscribe.apply(sTopics, sKafkaParams, sOffsets); +final ConsumerStrategy sub2 = + Subscribe.apply(sTopics, sKafkaParams); +final ConsumerStrategy sub3 = + Subscribe.create(topics, kafkaParams, offsets); +final ConsumerStrategy sub4 = + Subscribe.create(topics, kafkaParams); + +Assert.assertEquals( + sub1.executorKafkaParams().get("bootstrap.servers"), + sub3.executorKafkaParams().get("bootstrap.servers")); + +final ConsumerStrategy asn0 = + Assign.apply(parts, kafkaParams, offsets); +final ConsumerStrategy asn1 = + Assign.apply(sParts, sKafkaParams, sOffsets); +final ConsumerStrategy asn2 = + Assign.apply(sParts, sKafkaParams); +final ConsumerStrategy asn3 = + Assign.create(parts, kafkaParams, offsets); +final ConsumerStrategy asn4 = + Assign.create(parts, kafkaParams); + +Assert.assertEquals( + asn1.executorKafkaParams().get("bootstrap.servers"), + asn3.executorKafkaParams().get("bootstrap.servers")); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java -- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java new file mode 100644 index 000..e57ede7 --- /dev/null +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may
[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API ## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #11863 from koeninger/kafka-0.9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dedbceec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dedbceec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dedbceec Branch: refs/heads/master Commit: dedbceec1ef33ccd88101016de969a1ef3e3e142 Parents: bde1d6a Author: cody koeninger Authored: Wed Jun 29 23:21:03 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 23:21:03 2016 -0700 -- external/kafka-0-10-assembly/pom.xml| 176 ++ external/kafka-0-10/pom.xml | 98 +++ .../kafka010/CachedKafkaConsumer.scala | 189 ++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ++ .../kafka010/DirectKafkaInputDStream.scala | 318 ++ .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++ .../streaming/kafka010/KafkaRDDPartition.scala | 45 ++ .../streaming/kafka010/KafkaTestUtils.scala | 277 + .../spark/streaming/kafka010/KafkaUtils.scala | 175 ++ .../streaming/kafka010/LocationStrategy.scala | 77 +++ .../spark/streaming/kafka010/OffsetRange.scala | 153 + .../spark/streaming/kafka010/package-info.java | 21 + .../spark/streaming/kafka010/package.scala | 23 + .../kafka010/JavaConsumerStrategySuite.java | 84 +++ .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 .../kafka010/JavaLocationStrategySuite.java | 58 ++ .../src/test/resources/log4j.properties | 28 + .../kafka010/DirectKafkaStreamSuite.scala | 612 +++ .../streaming/kafka010/KafkaRDDSuite.scala | 169 + pom.xml | 2 + project/SparkBuild.scala| 12 +- 22 files changed, 3359 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10-assembly/pom.xml -- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml new file mode 100644 index 000..f2468d1 --- /dev/null +++ b/external/kafka-0-10-assembly/pom.xml @@ -0,0 +1,176 @@ + + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.0.0-SNAPSHOT +../../pom.xml + + + org.apache.spark + spark-streaming-kafka-0-10-assembly_2.11 + jar + Spark Integration for Kafka 0.10 Assembly + http://spark.apache.org/ + + +streaming-kafka-0-10-assembly + + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + commons-codec + commons-codec + provided + + + commons-lang + commons-lang + provided + + + com.google.protobuf + protobuf-java + provided + + + net.jpountz.lz4 + lz4 + provided + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.avro + avro-mapred + ${avro.mapred.classifier} + provided + + + org.apache.curator + curator-recipes + provided + + + org.apache.zookeeper + zookeeper + provided + + + log4j + log4j + provided + + + net.java.dev.jets3t + jets3t + provided + + + org.scala-lang + scala-library + provided + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + provided + + + org.xerial.snappy + snappy-java + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + +false + + +*:* + + + + +*:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + +shade +
spark git commit: [SPARK-16294][SQL] Labelling support for the include_example Jekyll plugin
Repository: spark Updated Branches: refs/heads/branch-2.0 b52bd8070 -> a54852350 [SPARK-16294][SQL] Labelling support for the include_example Jekyll plugin ## What changes were proposed in this pull request? This PR adds labelling support for the `include_example` Jekyll plugin, so that we may split a single source file into multiple line blocks with different labels, and include them in multiple code snippets in the generated HTML page. ## How was this patch tested? Manually tested. https://cloud.githubusercontent.com/assets/230655/16451099/66a76db2-3e33-11e6-84fb-63104c2f0688.png";> Author: Cheng Lian Closes #13972 from liancheng/include-example-with-labels. (cherry picked from commit bde1d6a61593aeb62370f526542cead94919b0c0) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5485235 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5485235 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5485235 Branch: refs/heads/branch-2.0 Commit: a54852350346cacae61d851d796bc3a7abd3a048 Parents: b52bd80 Author: Cheng Lian Authored: Wed Jun 29 22:50:53 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 22:51:04 2016 -0700 -- docs/_plugins/include_example.rb| 25 +--- docs/sql-programming-guide.md | 41 +++- .../apache/spark/examples/sql/JavaSparkSQL.java | 5 +++ examples/src/main/python/sql.py | 5 +++ .../apache/spark/examples/sql/RDDRelation.scala | 10 - 5 files changed, 43 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5485235/docs/_plugins/include_example.rb -- diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb index f748582..306 100644 --- a/docs/_plugins/include_example.rb +++ b/docs/_plugins/include_example.rb @@ -32,8 +32,18 @@ module Jekyll @code_dir = File.join(site.source, config_dir) clean_markup = @markup.strip - @file = File.join(@code_dir, clean_markup) - @lang = clean_markup.split('.').last + + parts = clean_markup.strip.split(' ') + if parts.length > 1 then +@snippet_label = ':' + parts[0] +snippet_file = parts[1] + else +@snippet_label = '' +snippet_file = parts[0] + end + + @file = File.join(@code_dir, snippet_file) + @lang = snippet_file.split('.').last code = File.open(@file).read.encode("UTF-8") code = select_lines(code) @@ -41,7 +51,7 @@ module Jekyll rendered_code = Pygments.highlight(code, :lexer => @lang) hint = "Find full example code at " \ -"\"examples/src/main/#{clean_markup}\" in the Spark repo." +"\"examples/src/main/#{snippet_file}\" in the Spark repo." rendered_code + hint end @@ -66,13 +76,13 @@ module Jekyll # Select the array of start labels from code. startIndices = lines .each_with_index -.select { |l, i| l.include? "$example on$" } +.select { |l, i| l.include? "$example on#{@snippet_label}$" } .map { |l, i| i } # Select the array of end labels from code. endIndices = lines .each_with_index -.select { |l, i| l.include? "$example off$" } +.select { |l, i| l.include? "$example off#{@snippet_label}$" } .map { |l, i| i } raise "Start indices amount is not equal to end indices amount, see #{@file}." \ @@ -92,7 +102,10 @@ module Jekyll if start == endline lastIndex = endline range = Range.new(start + 1, endline - 1) -result += trim_codeblock(lines[range]).join +trimmed = trim_codeblock(lines[range]) +# Filter out possible example tags of overlapped labels. +taggs_filtered = trimmed.select { |l| !l.include? '$example ' } +result += taggs_filtered.join result += "\n" end result http://git-wip-us.apache.org/repos/asf/spark/blob/a5485235/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 6c6bc8d..68419e1 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -63,52 +63,23 @@ Throughout this document, we will often refer to Scala/Java Datasets of `Row`s a -The entry point into all functionality in Spark is the [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.build()`: - -{% highlight scala %} -import org.apache.spark.sql.SparkSession - -val spark = SparkSession.build() - .master("local") - .appNam
spark git commit: [SPARK-16294][SQL] Labelling support for the include_example Jekyll plugin
Repository: spark Updated Branches: refs/heads/master d3af6731f -> bde1d6a61 [SPARK-16294][SQL] Labelling support for the include_example Jekyll plugin ## What changes were proposed in this pull request? This PR adds labelling support for the `include_example` Jekyll plugin, so that we may split a single source file into multiple line blocks with different labels, and include them in multiple code snippets in the generated HTML page. ## How was this patch tested? Manually tested. https://cloud.githubusercontent.com/assets/230655/16451099/66a76db2-3e33-11e6-84fb-63104c2f0688.png";> Author: Cheng Lian Closes #13972 from liancheng/include-example-with-labels. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bde1d6a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bde1d6a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bde1d6a6 Branch: refs/heads/master Commit: bde1d6a61593aeb62370f526542cead94919b0c0 Parents: d3af673 Author: Cheng Lian Authored: Wed Jun 29 22:50:53 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 22:50:53 2016 -0700 -- docs/_plugins/include_example.rb| 25 +--- docs/sql-programming-guide.md | 41 +++- .../apache/spark/examples/sql/JavaSparkSQL.java | 5 +++ examples/src/main/python/sql.py | 5 +++ .../apache/spark/examples/sql/RDDRelation.scala | 10 - 5 files changed, 43 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d6a6/docs/_plugins/include_example.rb -- diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb index f748582..306 100644 --- a/docs/_plugins/include_example.rb +++ b/docs/_plugins/include_example.rb @@ -32,8 +32,18 @@ module Jekyll @code_dir = File.join(site.source, config_dir) clean_markup = @markup.strip - @file = File.join(@code_dir, clean_markup) - @lang = clean_markup.split('.').last + + parts = clean_markup.strip.split(' ') + if parts.length > 1 then +@snippet_label = ':' + parts[0] +snippet_file = parts[1] + else +@snippet_label = '' +snippet_file = parts[0] + end + + @file = File.join(@code_dir, snippet_file) + @lang = snippet_file.split('.').last code = File.open(@file).read.encode("UTF-8") code = select_lines(code) @@ -41,7 +51,7 @@ module Jekyll rendered_code = Pygments.highlight(code, :lexer => @lang) hint = "Find full example code at " \ -"\"examples/src/main/#{clean_markup}\" in the Spark repo." +"\"examples/src/main/#{snippet_file}\" in the Spark repo." rendered_code + hint end @@ -66,13 +76,13 @@ module Jekyll # Select the array of start labels from code. startIndices = lines .each_with_index -.select { |l, i| l.include? "$example on$" } +.select { |l, i| l.include? "$example on#{@snippet_label}$" } .map { |l, i| i } # Select the array of end labels from code. endIndices = lines .each_with_index -.select { |l, i| l.include? "$example off$" } +.select { |l, i| l.include? "$example off#{@snippet_label}$" } .map { |l, i| i } raise "Start indices amount is not equal to end indices amount, see #{@file}." \ @@ -92,7 +102,10 @@ module Jekyll if start == endline lastIndex = endline range = Range.new(start + 1, endline - 1) -result += trim_codeblock(lines[range]).join +trimmed = trim_codeblock(lines[range]) +# Filter out possible example tags of overlapped labels. +taggs_filtered = trimmed.select { |l| !l.include? '$example ' } +result += taggs_filtered.join result += "\n" end result http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d6a6/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 6c6bc8d..68419e1 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -63,52 +63,23 @@ Throughout this document, we will often refer to Scala/Java Datasets of `Row`s a -The entry point into all functionality in Spark is the [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.build()`: - -{% highlight scala %} -import org.apache.spark.sql.SparkSession - -val spark = SparkSession.build() - .master("local") - .appName("Word Count") - .config("spark.some.config.option", "some-value") - .getOrCreate() - -// this is used to
spark git commit: [SPARK-16274][SQL] Implement xpath_boolean
Repository: spark Updated Branches: refs/heads/master 831a04f5d -> d3af6731f [SPARK-16274][SQL] Implement xpath_boolean ## What changes were proposed in this pull request? This patch implements xpath_boolean expression for Spark SQL, a xpath function that returns true or false. The implementation is modelled after Hive's xpath_boolean, except that how the expression handles null inputs. Hive throws a NullPointerException at runtime if either of the input is null. This implementation returns null if either of the input is null. ## How was this patch tested? Created two new test suites. One for unit tests covering the expression, and the other for end-to-end test in SQL. Author: petermaxlee Closes #13964 from petermaxlee/SPARK-16274. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3af6731 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3af6731 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3af6731 Branch: refs/heads/master Commit: d3af6731fa270842818ed91d6b4d14708ddae2db Parents: 831a04f Author: petermaxlee Authored: Thu Jun 30 09:27:48 2016 +0800 Committer: Wenchen Fan Committed: Thu Jun 30 09:27:48 2016 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 2 + .../catalyst/expressions/xml/XPathBoolean.scala | 58 +++ .../expressions/xml/XPathExpressionSuite.scala | 61 .../apache/spark/sql/XmlFunctionsSuite.scala| 32 ++ .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 154 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3af6731/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0bde48c..3f9227a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.xml._ import org.apache.spark.sql.catalyst.util.StringKeyHashMap @@ -301,6 +302,7 @@ object FunctionRegistry { expression[UnBase64]("unbase64"), expression[Unhex]("unhex"), expression[Upper]("upper"), +expression[XPathBoolean]("xpath_boolean"), // datetime functions expression[AddMonths]("add_months"), http://git-wip-us.apache.org/repos/asf/spark/blob/d3af6731/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala new file mode 100644 index 000..2a5256c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.xml + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +@ExpressionDescription( + usage = "_FUNC_(xml, xpath) - Evaluates a boolean xpath expression.", + extended = "> SELECT _FUNC_('1','a/b');\ntrue") +case class XPathBoolean(xml: Expression, path: Expr
spark git commit: [SPARK-16267][TEST] Replace deprecated `CREATE TEMPORARY TABLE ... USING` from testsuites.
Repository: spark Updated Branches: refs/heads/branch-2.0 e1bdf1e02 -> b52bd8070 [SPARK-16267][TEST] Replace deprecated `CREATE TEMPORARY TABLE ... USING` from testsuites. ## What changes were proposed in this pull request? After SPARK-15674, `DDLStrategy` prints out the following deprecation messages in the testsuites. ``` 12:10:53.284 WARN org.apache.spark.sql.execution.SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE normal_orc_source USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead ``` Total : 40 - JDBCWriteSuite: 14 - DDLSuite: 6 - TableScanSuite: 6 - ParquetSourceSuite: 5 - OrcSourceSuite: 2 - SQLQuerySuite: 2 - HiveCommandSuite: 2 - JsonSuite: 1 - PrunedScanSuite: 1 - FilteredScanSuite 1 This PR replaces `CREATE TEMPORARY TABLE` with `CREATE TEMPORARY VIEW` in order to remove the deprecation messages in the above testsuites except `DDLSuite`, `SQLQuerySuite`, `HiveCommandSuite`. The Jenkins results shows only remaining 10 messages. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61422/consoleFull ## How was this patch tested? This is a testsuite-only change. Author: Dongjoon Hyun Closes #13956 from dongjoon-hyun/SPARK-16267. (cherry picked from commit 831a04f5d152d1839c0edfdf65bb728aa5957f16) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b52bd807 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b52bd807 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b52bd807 Branch: refs/heads/branch-2.0 Commit: b52bd8070dc852b419283f8a14595e42c179d3d0 Parents: e1bdf1e Author: Dongjoon Hyun Authored: Wed Jun 29 17:29:17 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 17:29:23 2016 -0700 -- .../sql/execution/datasources/json/JsonSuite.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 4 ++-- .../apache/spark/sql/sources/FilteredScanSuite.scala| 2 +- .../org/apache/spark/sql/sources/PrunedScanSuite.scala | 2 +- .../org/apache/spark/sql/sources/TableScanSuite.scala | 12 ++-- .../org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/parquetSuites.scala | 10 +- 7 files changed, 18 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b52bd807/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9f35c02..6c72019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -847,7 +847,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { sql( s""" -|CREATE TEMPORARY TABLE jsonTableSQL +|CREATE TEMPORARY VIEW jsonTableSQL |USING org.apache.spark.sql.json |OPTIONS ( | path '$path' http://git-wip-us.apache.org/repos/asf/spark/blob/b52bd807/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 48fa5f9..ff66f53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -57,14 +57,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { sql( s""" -|CREATE TEMPORARY TABLE PEOPLE +|CREATE OR REPLACE TEMPORARY VIEW PEOPLE |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) sql( s""" -|CREATE TEMPORARY TABLE PEOPLE1 +|CREATE OR REPLACE TEMPORARY VIEW PEOPLE1 |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) http://git-wip-us.apache.org/repos/asf/spark/blob/b52bd807/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/source
spark git commit: [SPARK-16267][TEST] Replace deprecated `CREATE TEMPORARY TABLE ... USING` from testsuites.
Repository: spark Updated Branches: refs/heads/master d063898be -> 831a04f5d [SPARK-16267][TEST] Replace deprecated `CREATE TEMPORARY TABLE ... USING` from testsuites. ## What changes were proposed in this pull request? After SPARK-15674, `DDLStrategy` prints out the following deprecation messages in the testsuites. ``` 12:10:53.284 WARN org.apache.spark.sql.execution.SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE normal_orc_source USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead ``` Total : 40 - JDBCWriteSuite: 14 - DDLSuite: 6 - TableScanSuite: 6 - ParquetSourceSuite: 5 - OrcSourceSuite: 2 - SQLQuerySuite: 2 - HiveCommandSuite: 2 - JsonSuite: 1 - PrunedScanSuite: 1 - FilteredScanSuite 1 This PR replaces `CREATE TEMPORARY TABLE` with `CREATE TEMPORARY VIEW` in order to remove the deprecation messages in the above testsuites except `DDLSuite`, `SQLQuerySuite`, `HiveCommandSuite`. The Jenkins results shows only remaining 10 messages. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61422/consoleFull ## How was this patch tested? This is a testsuite-only change. Author: Dongjoon Hyun Closes #13956 from dongjoon-hyun/SPARK-16267. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/831a04f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/831a04f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/831a04f5 Branch: refs/heads/master Commit: 831a04f5d152d1839c0edfdf65bb728aa5957f16 Parents: d063898 Author: Dongjoon Hyun Authored: Wed Jun 29 17:29:17 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 17:29:17 2016 -0700 -- .../sql/execution/datasources/json/JsonSuite.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 4 ++-- .../apache/spark/sql/sources/FilteredScanSuite.scala| 2 +- .../org/apache/spark/sql/sources/PrunedScanSuite.scala | 2 +- .../org/apache/spark/sql/sources/TableScanSuite.scala | 12 ++-- .../org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/parquetSuites.scala | 10 +- 7 files changed, 18 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/831a04f5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9f35c02..6c72019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -847,7 +847,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { sql( s""" -|CREATE TEMPORARY TABLE jsonTableSQL +|CREATE TEMPORARY VIEW jsonTableSQL |USING org.apache.spark.sql.json |OPTIONS ( | path '$path' http://git-wip-us.apache.org/repos/asf/spark/blob/831a04f5/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 48fa5f9..ff66f53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -57,14 +57,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { sql( s""" -|CREATE TEMPORARY TABLE PEOPLE +|CREATE OR REPLACE TEMPORARY VIEW PEOPLE |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) sql( s""" -|CREATE TEMPORARY TABLE PEOPLE1 +|CREATE OR REPLACE TEMPORARY VIEW PEOPLE1 |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) http://git-wip-us.apache.org/repos/asf/spark/blob/831a04f5/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 45e737f..be56c96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sq
spark git commit: Revert "[SPARK-16134][SQL] optimizer rules for typed filter"
Repository: spark Updated Branches: refs/heads/branch-2.0 8da431473 -> e1bdf1e02 Revert "[SPARK-16134][SQL] optimizer rules for typed filter" This reverts commit 8da4314735ed55f259642e2977d8d7bf2212474f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1bdf1e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1bdf1e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1bdf1e0 Branch: refs/heads/branch-2.0 Commit: e1bdf1e02483bf513b6e012e8921d440a5efbc11 Parents: 8da4314 Author: Cheng Lian Authored: Thu Jun 30 08:17:43 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 08:17:43 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala| 1 - .../sql/catalyst/optimizer/Optimizer.scala | 98 +++- .../sql/catalyst/plans/logical/object.scala | 47 +- .../TypedFilterOptimizationSuite.scala | 86 - .../scala/org/apache/spark/sql/Dataset.scala| 12 ++- .../spark/sql/execution/SparkStrategies.scala | 2 - .../scala/org/apache/spark/sql/QueryTest.scala | 1 - 8 files changed, 91 insertions(+), 162 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 84c9cc8..2ca990d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,7 +293,11 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) + def filter[T : Encoder](func: T => Boolean): LogicalPlan = { +val deserialized = logicalPlan.deserialize[T] +val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) +Filter(condition, deserialized).serialize[T] + } def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 127797c..502d791 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,7 +45,6 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal - case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa90735..f24f8b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,7 +21,6 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer -import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -110,7 +109,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :: Batch("Typed Filter Optimization", fixedPoint, - CombineTypedFilters) :: + EmbedSerializerInFilter, + RemoveAliasOnlyProject) :: Batch("Loca
spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter
Repository: spark Updated Branches: refs/heads/branch-2.0 011befd20 -> 8da431473 [SPARK-16134][SQL] optimizer rules for typed filter ## What changes were proposed in this pull request? This PR adds 3 optimizer rules for typed filter: 1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition. 2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition. 3. combine adjacent typed filters and share the deserialized object among all the condition expressions. This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules. ## How was this patch tested? `TypedFilterOptimizationSuite` Author: Wenchen Fan Closes #13846 from cloud-fan/filter. (cherry picked from commit d063898bebaaf4ec2aad24c3ac70aabdbf97a190) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da43147 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da43147 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da43147 Branch: refs/heads/branch-2.0 Commit: 8da4314735ed55f259642e2977d8d7bf2212474f Parents: 011befd Author: Wenchen Fan Authored: Thu Jun 30 08:15:08 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 08:15:50 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala| 1 + .../sql/catalyst/optimizer/Optimizer.scala | 98 +--- .../sql/catalyst/plans/logical/object.scala | 47 +- .../TypedFilterOptimizationSuite.scala | 86 + .../scala/org/apache/spark/sql/Dataset.scala| 12 +-- .../spark/sql/execution/SparkStrategies.scala | 2 + .../scala/org/apache/spark/sql/QueryTest.scala | 1 + 8 files changed, 162 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 2ca990d..84c9cc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,11 +293,7 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = { -val deserialized = logicalPlan.deserialize[T] -val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) -Filter(condition, deserialized).serialize[T] - } + def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 502d791..127797c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal + case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/8da43147/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f24f8b7..aa90735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
spark git commit: [SPARK-16134][SQL] optimizer rules for typed filter
Repository: spark Updated Branches: refs/heads/master 2eaabfa41 -> d063898be [SPARK-16134][SQL] optimizer rules for typed filter ## What changes were proposed in this pull request? This PR adds 3 optimizer rules for typed filter: 1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition. 2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition. 3. combine adjacent typed filters and share the deserialized object among all the condition expressions. This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules. ## How was this patch tested? `TypedFilterOptimizationSuite` Author: Wenchen Fan Closes #13846 from cloud-fan/filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d063898b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d063898b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d063898b Branch: refs/heads/master Commit: d063898bebaaf4ec2aad24c3ac70aabdbf97a190 Parents: 2eaabfa Author: Wenchen Fan Authored: Thu Jun 30 08:15:08 2016 +0800 Committer: Cheng Lian Committed: Thu Jun 30 08:15:08 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +- .../expressions/ReferenceToExpressions.scala| 1 + .../sql/catalyst/optimizer/Optimizer.scala | 98 +--- .../sql/catalyst/plans/logical/object.scala | 47 +- .../TypedFilterOptimizationSuite.scala | 86 + .../scala/org/apache/spark/sql/Dataset.scala| 12 +-- .../spark/sql/execution/SparkStrategies.scala | 2 + .../scala/org/apache/spark/sql/QueryTest.scala | 1 + 8 files changed, 162 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 2ca990d..84c9cc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -293,11 +293,7 @@ package object dsl { def where(condition: Expression): LogicalPlan = Filter(condition, logicalPlan) - def filter[T : Encoder](func: T => Boolean): LogicalPlan = { -val deserialized = logicalPlan.deserialize[T] -val condition = expressions.callFunction(func, BooleanType, deserialized.output.head) -Filter(condition, deserialized).serialize[T] - } + def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan) def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala index 502d791..127797c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala @@ -45,6 +45,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression]) var maxOrdinal = -1 result foreach { case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = b.ordinal + case _ => } if (maxOrdinal > children.length) { return TypeCheckFailure(s"The result expression need $maxOrdinal input expressions, but " + http://git-wip-us.apache.org/repos/asf/spark/blob/d063898b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9bc8cea..842d6bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,6 +21,7 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import sca
spark git commit: [SPARK-16228][SQL] HiveSessionCatalog should return `double`-param functions for decimal param lookups
Repository: spark Updated Branches: refs/heads/branch-2.0 c4cebd572 -> 011befd20 [SPARK-16228][SQL] HiveSessionCatalog should return `double`-param functions for decimal param lookups ## What changes were proposed in this pull request? This PR supports a fallback lookup by casting `DecimalType` into `DoubleType` for the external functions with `double`-type parameter. **Reported Error Scenarios** ```scala scala> sql("select percentile(value, 0.5) from values 1,2,3 T(value)") org.apache.spark.sql.AnalysisException: ... No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (int, decimal(38,18)). Possible choices: _FUNC_(bigint, array) _FUNC_(bigint, double) ; line 1 pos 7 scala> sql("select percentile_approx(value, 0.5) from values 1.0,2.0,3.0 T(value)") org.apache.spark.sql.AnalysisException: ... Only a float/double or float/double array argument is accepted as parameter 2, but decimal(38,18) was passed instead.; line 1 pos 7 ``` ## How was this patch tested? Pass the Jenkins tests (including a new testcase). Author: Dongjoon Hyun Closes #13930 from dongjoon-hyun/SPARK-16228. (cherry picked from commit 2eaabfa4142d4050be2b45fd277ff5c7fa430581) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/011befd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/011befd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/011befd2 Branch: refs/heads/branch-2.0 Commit: 011befd2098bf78979cc8e00de1576bf339583b2 Parents: c4cebd5 Author: Dongjoon Hyun Authored: Wed Jun 29 16:08:10 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 16:08:19 2016 -0700 -- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 16 +++- .../spark/sql/hive/execution/HiveUDFSuite.scala | 7 +++ 2 files changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/011befd2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 8a47dcf..2589b9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -30,12 +30,13 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DecimalType, DoubleType} import org.apache.spark.util.Utils @@ -163,6 +164,19 @@ private[sql] class HiveSessionCatalog( } override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { +try { + lookupFunction0(name, children) +} catch { + case NonFatal(_) => +// SPARK-16228 ExternalCatalog may recognize `double`-type only. +val newChildren = children.map { child => + if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else child +} +lookupFunction0(name, newChildren) +} + } + + private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to // if (super.functionExists(name)) { // super.lookupFunction(name, children) http://git-wip-us.apache.org/repos/asf/spark/blob/011befd2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 0f56b2c..def4601 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -142,6 +142,13 @@ class HiveUDFSuite extends QueryTest with T
spark git commit: [SPARK-16228][SQL] HiveSessionCatalog should return `double`-param functions for decimal param lookups
Repository: spark Updated Branches: refs/heads/master 23c58653f -> 2eaabfa41 [SPARK-16228][SQL] HiveSessionCatalog should return `double`-param functions for decimal param lookups ## What changes were proposed in this pull request? This PR supports a fallback lookup by casting `DecimalType` into `DoubleType` for the external functions with `double`-type parameter. **Reported Error Scenarios** ```scala scala> sql("select percentile(value, 0.5) from values 1,2,3 T(value)") org.apache.spark.sql.AnalysisException: ... No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (int, decimal(38,18)). Possible choices: _FUNC_(bigint, array) _FUNC_(bigint, double) ; line 1 pos 7 scala> sql("select percentile_approx(value, 0.5) from values 1.0,2.0,3.0 T(value)") org.apache.spark.sql.AnalysisException: ... Only a float/double or float/double array argument is accepted as parameter 2, but decimal(38,18) was passed instead.; line 1 pos 7 ``` ## How was this patch tested? Pass the Jenkins tests (including a new testcase). Author: Dongjoon Hyun Closes #13930 from dongjoon-hyun/SPARK-16228. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2eaabfa4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2eaabfa4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2eaabfa4 Branch: refs/heads/master Commit: 2eaabfa4142d4050be2b45fd277ff5c7fa430581 Parents: 23c5865 Author: Dongjoon Hyun Authored: Wed Jun 29 16:08:10 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 16:08:10 2016 -0700 -- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 16 +++- .../spark/sql/hive/execution/HiveUDFSuite.scala | 7 +++ 2 files changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2eaabfa4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 8a47dcf..2589b9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -30,12 +30,13 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DecimalType, DoubleType} import org.apache.spark.util.Utils @@ -163,6 +164,19 @@ private[sql] class HiveSessionCatalog( } override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { +try { + lookupFunction0(name, children) +} catch { + case NonFatal(_) => +// SPARK-16228 ExternalCatalog may recognize `double`-type only. +val newChildren = children.map { child => + if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else child +} +lookupFunction0(name, newChildren) +} + } + + private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to // if (super.functionExists(name)) { // super.lookupFunction(name, children) http://git-wip-us.apache.org/repos/asf/spark/blob/2eaabfa4/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 0f56b2c..def4601 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -142,6 +142,13 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql("SELECT array(max(key), max(key)) FROM src").collect().toS
spark git commit: [SPARK-16238] Metrics for generated method and class bytecode size
Repository: spark Updated Branches: refs/heads/master 9b1b3ae77 -> 23c58653f [SPARK-16238] Metrics for generated method and class bytecode size ## What changes were proposed in this pull request? This extends SPARK-15860 to include metrics for the actual bytecode size of janino-generated methods. They can be accessed in the same way as any other codahale metric, e.g. ``` scala> org.apache.spark.metrics.source.CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.getSnapshot().getValues() res7: Array[Long] = Array(532, 532, 532, 542, 1479, 2670, 3585, 3585) scala> org.apache.spark.metrics.source.CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.getSnapshot().getValues() res8: Array[Long] = Array(5, 5, 5, 5, 10, 10, 10, 10, 15, 15, 15, 38, 63, 79, 88, 94, 94, 94, 132, 132, 165, 165, 220, 220) ``` ## How was this patch tested? Small unit test, also verified manually that the performance impact is minimal (<10%). hvanhovell Author: Eric Liang Closes #13934 from ericl/spark-16238. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23c58653 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23c58653 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23c58653 Branch: refs/heads/master Commit: 23c58653f900bfb71ef2b3186a95ad2562c33969 Parents: 9b1b3ae Author: Eric Liang Authored: Wed Jun 29 15:07:32 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 15:07:32 2016 -0700 -- .../spark/metrics/source/StaticSources.scala| 12 ++ .../expressions/codegen/CodeGenerator.scala | 40 +++- .../expressions/CodeGenerationSuite.scala | 4 ++ 3 files changed, 55 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23c58653/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index 6819222..6bba259 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -47,4 +47,16 @@ object CodegenMetrics extends Source { * Histogram of the time it took to compile source code text (in milliseconds). */ val METRIC_COMPILATION_TIME = metricRegistry.histogram(MetricRegistry.name("compilationTime")) + + /** + * Histogram of the bytecode size of each class generated by CodeGenerator. + */ + val METRIC_GENERATED_CLASS_BYTECODE_SIZE = +metricRegistry.histogram(MetricRegistry.name("generatedClassSize")) + + /** + * Histogram of the bytecode size of each method in classes generated by CodeGenerator. + */ + val METRIC_GENERATED_METHOD_BYTECODE_SIZE = +metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } http://git-wip-us.apache.org/repos/asf/spark/blob/23c58653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 6392ff4..16fb1f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.io.ByteArrayInputStream +import java.util.{Map => JavaMap} + +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.cache.{CacheBuilder, CacheLoader} -import org.codehaus.janino.ClassBodyEvaluator +import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} +import org.codehaus.janino.util.ClassFile import scala.language.existentials import org.apache.spark.SparkEnv @@ -876,6 +881,7 @@ object CodeGenerator extends Logging { try { evaluator.cook("generated.java", code.body) + recordCompilationStats(evaluator) } catch { case e: Exception => val msg = s"failed to compile: $e\n$formatted" @@ -886,6 +892,38 @@ object CodeGenerator extends Logging { } /** + * Records the generated class and method bytecode sizes by inspecting janino private fields. + */ + private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = { +// First retrieve the generated classes. +val classes = { + val result
spark git commit: [SPARK-16238] Metrics for generated method and class bytecode size
Repository: spark Updated Branches: refs/heads/branch-2.0 ef0253ff6 -> c4cebd572 [SPARK-16238] Metrics for generated method and class bytecode size ## What changes were proposed in this pull request? This extends SPARK-15860 to include metrics for the actual bytecode size of janino-generated methods. They can be accessed in the same way as any other codahale metric, e.g. ``` scala> org.apache.spark.metrics.source.CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.getSnapshot().getValues() res7: Array[Long] = Array(532, 532, 532, 542, 1479, 2670, 3585, 3585) scala> org.apache.spark.metrics.source.CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.getSnapshot().getValues() res8: Array[Long] = Array(5, 5, 5, 5, 10, 10, 10, 10, 15, 15, 15, 38, 63, 79, 88, 94, 94, 94, 132, 132, 165, 165, 220, 220) ``` ## How was this patch tested? Small unit test, also verified manually that the performance impact is minimal (<10%). hvanhovell Author: Eric Liang Closes #13934 from ericl/spark-16238. (cherry picked from commit 23c58653f900bfb71ef2b3186a95ad2562c33969) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4cebd57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4cebd57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4cebd57 Branch: refs/heads/branch-2.0 Commit: c4cebd5725e6d8ade8c0a02652e251d04903da72 Parents: ef0253f Author: Eric Liang Authored: Wed Jun 29 15:07:32 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 15:07:38 2016 -0700 -- .../spark/metrics/source/StaticSources.scala| 12 ++ .../expressions/codegen/CodeGenerator.scala | 40 +++- .../expressions/CodeGenerationSuite.scala | 4 ++ 3 files changed, 55 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4cebd57/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index 6819222..6bba259 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -47,4 +47,16 @@ object CodegenMetrics extends Source { * Histogram of the time it took to compile source code text (in milliseconds). */ val METRIC_COMPILATION_TIME = metricRegistry.histogram(MetricRegistry.name("compilationTime")) + + /** + * Histogram of the bytecode size of each class generated by CodeGenerator. + */ + val METRIC_GENERATED_CLASS_BYTECODE_SIZE = +metricRegistry.histogram(MetricRegistry.name("generatedClassSize")) + + /** + * Histogram of the bytecode size of each method in classes generated by CodeGenerator. + */ + val METRIC_GENERATED_METHOD_BYTECODE_SIZE = +metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } http://git-wip-us.apache.org/repos/asf/spark/blob/c4cebd57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 6392ff4..16fb1f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.io.ByteArrayInputStream +import java.util.{Map => JavaMap} + +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.cache.{CacheBuilder, CacheLoader} -import org.codehaus.janino.ClassBodyEvaluator +import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} +import org.codehaus.janino.util.ClassFile import scala.language.existentials import org.apache.spark.SparkEnv @@ -876,6 +881,7 @@ object CodeGenerator extends Logging { try { evaluator.cook("generated.java", code.body) + recordCompilationStats(evaluator) } catch { case e: Exception => val msg = s"failed to compile: $e\n$formatted" @@ -886,6 +892,38 @@ object CodeGenerator extends Logging { } /** + * Records the generated class and method bytecode sizes by inspecting janino private fields. + */ + private def recordCompilationStats(evaluator: ClassBod
spark git commit: [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throws non-intuitive exception
Repository: spark Updated Branches: refs/heads/branch-2.0 a7f66ef62 -> ef0253ff6 [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throws non-intuitive exception ## What changes were proposed in this pull request? This PR allows `emptyDataFrame.write` since the user didn't specify any partition columns. **Before** ```scala scala> spark.emptyDataFrame.write.parquet("/tmp/t1") org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns; scala> spark.emptyDataFrame.write.csv("/tmp/t1") org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns; ``` After this PR, there occurs no exceptions and the created directory has only one file, `_SUCCESS`, as expected. ## How was this patch tested? Pass the Jenkins tests including updated test cases. Author: Dongjoon Hyun Closes #13730 from dongjoon-hyun/SPARK-16006. (cherry picked from commit 9b1b3ae771babf127f64898d5dc110721597a760) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef0253ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef0253ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef0253ff Branch: refs/heads/branch-2.0 Commit: ef0253ff6d7fb9bf89ef023f2d5864c70d9d792d Parents: a7f66ef Author: Dongjoon Hyun Authored: Wed Jun 29 15:00:41 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 15:00:47 2016 -0700 -- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 +- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala| 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef0253ff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 388df70..c356109 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -351,7 +351,7 @@ private[sql] object PartitioningUtils { } } -if (partitionColumns.size == schema.fields.size) { +if (partitionColumns.nonEmpty && partitionColumns.size == schema.fields.length) { throw new AnalysisException(s"Cannot use all columns for partition columns") } } http://git-wip-us.apache.org/repos/asf/spark/blob/ef0253ff/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ebbcc1d..7308f85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -246,8 +246,9 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) } intercept[AnalysisException] { - spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) } + spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throws non-intuitive exception
Repository: spark Updated Branches: refs/heads/master 8b5a8b25b -> 9b1b3ae77 [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throws non-intuitive exception ## What changes were proposed in this pull request? This PR allows `emptyDataFrame.write` since the user didn't specify any partition columns. **Before** ```scala scala> spark.emptyDataFrame.write.parquet("/tmp/t1") org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns; scala> spark.emptyDataFrame.write.csv("/tmp/t1") org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns; ``` After this PR, there occurs no exceptions and the created directory has only one file, `_SUCCESS`, as expected. ## How was this patch tested? Pass the Jenkins tests including updated test cases. Author: Dongjoon Hyun Closes #13730 from dongjoon-hyun/SPARK-16006. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b1b3ae7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b1b3ae7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b1b3ae7 Branch: refs/heads/master Commit: 9b1b3ae771babf127f64898d5dc110721597a760 Parents: 8b5a8b2 Author: Dongjoon Hyun Authored: Wed Jun 29 15:00:41 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 15:00:41 2016 -0700 -- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 +- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala| 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b1b3ae7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 388df70..c356109 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -351,7 +351,7 @@ private[sql] object PartitioningUtils { } } -if (partitionColumns.size == schema.fields.size) { +if (partitionColumns.nonEmpty && partitionColumns.size == schema.fields.length) { throw new AnalysisException(s"Cannot use all columns for partition columns") } } http://git-wip-us.apache.org/repos/asf/spark/blob/9b1b3ae7/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 58b1d56..d454100 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -246,8 +246,9 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) } intercept[AnalysisException] { - spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) } + spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16301] [SQL] The analyzer rule for resolving using joins should respect the case sensitivity setting.
Repository: spark Updated Branches: refs/heads/branch-2.0 809af6d9d -> a7f66ef62 [SPARK-16301] [SQL] The analyzer rule for resolving using joins should respect the case sensitivity setting. ## What changes were proposed in this pull request? The analyzer rule for resolving using joins should respect the case sensitivity setting. ## How was this patch tested? New tests in ResolveNaturalJoinSuite Author: Yin Huai Closes #13977 from yhuai/SPARK-16301. (cherry picked from commit 8b5a8b25b9d29b7d0949d5663c7394b26154a836) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7f66ef6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7f66ef6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7f66ef6 Branch: refs/heads/branch-2.0 Commit: a7f66ef62b94cdcf65c3043406fd5fd8d6a584c1 Parents: 809af6d Author: Yin Huai Authored: Wed Jun 29 14:42:58 2016 -0700 Committer: Davies Liu Committed: Wed Jun 29 14:43:08 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 26 - .../analysis/ResolveNaturalJoinSuite.scala | 30 2 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a7f66ef6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 96f2e38..d1d2c59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1836,13 +1836,25 @@ class Analyzer( } private def commonNaturalJoinProcessing( - left: LogicalPlan, - right: LogicalPlan, - joinType: JoinType, - joinNames: Seq[String], - condition: Option[Expression]) = { -val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get) -val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get) + left: LogicalPlan, + right: LogicalPlan, + joinType: JoinType, + joinNames: Seq[String], + condition: Option[Expression]) = { +val leftKeys = joinNames.map { keyName => + val joinColumn = left.output.find(attr => resolver(attr.name, keyName)) + assert( +joinColumn.isDefined, +s"$keyName should exist in ${left.output.map(_.name).mkString(",")}") + joinColumn.get +} +val rightKeys = joinNames.map { keyName => + val joinColumn = right.output.find(attr => resolver(attr.name, keyName)) + assert( +joinColumn.isDefined, +s"$keyName should exist in ${right.output.map(_.name).mkString(",")}") + joinColumn.get +} val joinPairs = leftKeys.zip(rightKeys) val newCondition = (condition ++ joinPairs.map(EqualTo.tupled)).reduceOption(And) http://git-wip-us.apache.org/repos/asf/spark/blob/a7f66ef6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 748579d..100ec4d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -113,4 +113,34 @@ class ResolveNaturalJoinSuite extends AnalysisTest { assert(error.message.contains( "using columns ['d] can not be resolved given input columns: [b, a, c]")) } + + test("using join with a case sensitive analyzer") { +val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + checkAnalysis(usingPlan, expected, caseSensitive = true) +} + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None) + assertAnalysisError( +usingPlan, +Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]")) +} + } + + test("using join with a case insensitive analyzer") { +val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + checkAnalysis(usingPlan, expected, caseSensitive = false) +} + +{ +
spark git commit: [SPARK-16301] [SQL] The analyzer rule for resolving using joins should respect the case sensitivity setting.
Repository: spark Updated Branches: refs/heads/master d8a87a3ed -> 8b5a8b25b [SPARK-16301] [SQL] The analyzer rule for resolving using joins should respect the case sensitivity setting. ## What changes were proposed in this pull request? The analyzer rule for resolving using joins should respect the case sensitivity setting. ## How was this patch tested? New tests in ResolveNaturalJoinSuite Author: Yin Huai Closes #13977 from yhuai/SPARK-16301. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b5a8b25 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b5a8b25 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b5a8b25 Branch: refs/heads/master Commit: 8b5a8b25b9d29b7d0949d5663c7394b26154a836 Parents: d8a87a3 Author: Yin Huai Authored: Wed Jun 29 14:42:58 2016 -0700 Committer: Davies Liu Committed: Wed Jun 29 14:42:58 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 26 - .../analysis/ResolveNaturalJoinSuite.scala | 30 2 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b5a8b25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 96f2e38..d1d2c59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1836,13 +1836,25 @@ class Analyzer( } private def commonNaturalJoinProcessing( - left: LogicalPlan, - right: LogicalPlan, - joinType: JoinType, - joinNames: Seq[String], - condition: Option[Expression]) = { -val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get) -val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get) + left: LogicalPlan, + right: LogicalPlan, + joinType: JoinType, + joinNames: Seq[String], + condition: Option[Expression]) = { +val leftKeys = joinNames.map { keyName => + val joinColumn = left.output.find(attr => resolver(attr.name, keyName)) + assert( +joinColumn.isDefined, +s"$keyName should exist in ${left.output.map(_.name).mkString(",")}") + joinColumn.get +} +val rightKeys = joinNames.map { keyName => + val joinColumn = right.output.find(attr => resolver(attr.name, keyName)) + assert( +joinColumn.isDefined, +s"$keyName should exist in ${right.output.map(_.name).mkString(",")}") + joinColumn.get +} val joinPairs = leftKeys.zip(rightKeys) val newCondition = (condition ++ joinPairs.map(EqualTo.tupled)).reduceOption(And) http://git-wip-us.apache.org/repos/asf/spark/blob/8b5a8b25/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 748579d..100ec4d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -113,4 +113,34 @@ class ResolveNaturalJoinSuite extends AnalysisTest { assert(error.message.contains( "using columns ['d] can not be resolved given input columns: [b, a, c]")) } + + test("using join with a case sensitive analyzer") { +val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + checkAnalysis(usingPlan, expected, caseSensitive = true) +} + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None) + assertAnalysisError( +usingPlan, +Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]")) +} + } + + test("using join with a case insensitive analyzer") { +val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + checkAnalysis(usingPlan, expected, caseSensitive = false) +} + +{ + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None) + checkAna
spark git commit: [TRIVIAL] [PYSPARK] Clean up orc compression option as well
Repository: spark Updated Branches: refs/heads/branch-2.0 3cc258efb -> 809af6d9d [TRIVIAL] [PYSPARK] Clean up orc compression option as well ## What changes were proposed in this pull request? This PR corrects ORC compression option for PySpark as well. I think this was missed mistakenly in https://github.com/apache/spark/pull/13948. ## How was this patch tested? N/A Author: hyukjinkwon Closes #13963 from HyukjinKwon/minor-orc-compress. (cherry picked from commit d8a87a3ed211dd08f06eeb9560661b8f11ce82fa) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/809af6d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/809af6d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/809af6d9 Branch: refs/heads/branch-2.0 Commit: 809af6d9d7df17f5889ebd8640c189e8d1e143a8 Parents: 3cc258e Author: hyukjinkwon Authored: Wed Jun 29 13:32:03 2016 -0700 Committer: Davies Liu Committed: Wed Jun 29 13:32:35 2016 -0700 -- python/pyspark/sql/readwriter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/809af6d9/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 44bf744..78d992e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -695,8 +695,7 @@ class DataFrameWriter(OptionUtils): self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) -if compression is not None: -self.option("compression", compression) +self._set_opts(compression=compression) self._jwrite.orc(path) @since(1.4) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [TRIVIAL] [PYSPARK] Clean up orc compression option as well
Repository: spark Updated Branches: refs/heads/master 64132a14f -> d8a87a3ed [TRIVIAL] [PYSPARK] Clean up orc compression option as well ## What changes were proposed in this pull request? This PR corrects ORC compression option for PySpark as well. I think this was missed mistakenly in https://github.com/apache/spark/pull/13948. ## How was this patch tested? N/A Author: hyukjinkwon Closes #13963 from HyukjinKwon/minor-orc-compress. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8a87a3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8a87a3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8a87a3e Branch: refs/heads/master Commit: d8a87a3ed211dd08f06eeb9560661b8f11ce82fa Parents: 64132a1 Author: hyukjinkwon Authored: Wed Jun 29 13:32:03 2016 -0700 Committer: Davies Liu Committed: Wed Jun 29 13:32:03 2016 -0700 -- python/pyspark/sql/readwriter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8a87a3e/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 44bf744..78d992e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -695,8 +695,7 @@ class DataFrameWriter(OptionUtils): self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) -if compression is not None: -self.option("compression", compression) +self._set_opts(compression=compression) self._jwrite.orc(path) @since(1.4) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16044][SQL] Backport input_file_name() for data source based on NewHadoopRDD to branch 1.6
Repository: spark Updated Branches: refs/heads/branch-1.6 0cb06c993 -> 1ac830aca [SPARK-16044][SQL] Backport input_file_name() for data source based on NewHadoopRDD to branch 1.6 ## What changes were proposed in this pull request? This PR backports https://github.com/apache/spark/pull/13759. (`SqlNewHadoopRDDState` was renamed to `InputFileNameHolder` and `spark` API does not exist in branch 1.6) ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwon Closes #13806 from HyukjinKwon/backport-SPARK-16044. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ac830ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ac830ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ac830ac Branch: refs/heads/branch-1.6 Commit: 1ac830aca089e9f0b9b0bf367236ffc1184eae7e Parents: 0cb06c9 Author: hyukjinkwon Authored: Wed Jun 29 13:11:56 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 13:11:56 2016 -0700 -- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 .../spark/sql/ColumnExpressionSuite.scala | 39 ++-- 2 files changed, 42 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index c8b4f30..46fe1ba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -134,6 +134,12 @@ class NewHadoopRDD[K, V]( val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.serializableHadoopSplit.value match { +case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) +case _ => SqlNewHadoopRDDState.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { @@ -190,6 +196,7 @@ class NewHadoopRDD[K, V]( private def close() { if (reader != null) { + SqlNewHadoopRDDState.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 38c0eb5..52b3d60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.scalatest.Matchers._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution.Project import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -591,15 +593,44 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("InputFileName") { + test("InputFileName - SqlNewHadoopRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName()) + val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name()) .head.getString(0) assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(inputFileName()).limit(1), Row("")) + checkAnswer(data.select(input_file_name()).limit(1), Row("")) +} + } + + test("input_file_name - HadoopRDD") { +withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val df =
spark git commit: [SPARK-16256][SQL][STREAMING] Added Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/branch-2.0 edd1905c0 -> 3cc258efb [SPARK-16256][SQL][STREAMING] Added Structured Streaming Programming Guide Title defines all. Author: Tathagata Das Closes #13945 from tdas/SPARK-16256. (cherry picked from commit 64132a14fb7a7255feeb5847a54f541fe551bf23) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc258ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc258ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc258ef Branch: refs/heads/branch-2.0 Commit: 3cc258efb14ee9a35163daa3fa8f4724507ac4af Parents: edd1905 Author: Tathagata Das Authored: Wed Jun 29 11:45:57 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 11:47:14 2016 -0700 -- docs/_layouts/global.html |1 + docs/img/structured-streaming-example-model.png | Bin 0 -> 125504 bytes docs/img/structured-streaming-late-data.png | Bin 0 -> 138931 bytes docs/img/structured-streaming-model.png | Bin 0 -> 66098 bytes .../structured-streaming-stream-as-a-table.png | Bin 0 -> 82251 bytes docs/img/structured-streaming-window.png| Bin 0 -> 128930 bytes docs/img/structured-streaming.pptx | Bin 0 -> 1105315 bytes docs/structured-streaming-programming-guide.md | 1156 ++ 8 files changed, 1157 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/_layouts/global.html -- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index d493f62..2d0c3fd 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -73,6 +73,7 @@ Spark Streaming DataFrames, Datasets and SQL +Structured Streaming MLlib (Machine Learning) GraphX (Graph Processing) SparkR (R on Spark) http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming-example-model.png -- diff --git a/docs/img/structured-streaming-example-model.png b/docs/img/structured-streaming-example-model.png new file mode 100644 index 000..af98765 Binary files /dev/null and b/docs/img/structured-streaming-example-model.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming-late-data.png -- diff --git a/docs/img/structured-streaming-late-data.png b/docs/img/structured-streaming-late-data.png new file mode 100644 index 000..5276b47 Binary files /dev/null and b/docs/img/structured-streaming-late-data.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming-model.png -- diff --git a/docs/img/structured-streaming-model.png b/docs/img/structured-streaming-model.png new file mode 100644 index 000..2061aae Binary files /dev/null and b/docs/img/structured-streaming-model.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming-stream-as-a-table.png -- diff --git a/docs/img/structured-streaming-stream-as-a-table.png b/docs/img/structured-streaming-stream-as-a-table.png new file mode 100644 index 000..8181216 Binary files /dev/null and b/docs/img/structured-streaming-stream-as-a-table.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming-window.png -- diff --git a/docs/img/structured-streaming-window.png b/docs/img/structured-streaming-window.png new file mode 100644 index 000..be9d3fb Binary files /dev/null and b/docs/img/structured-streaming-window.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx new file mode 100644 index 000..c278323 Binary files /dev/null and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/3cc258ef/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md new file mod
spark git commit: [SPARK-16256][SQL][STREAMING] Added Structured Streaming Programming Guide
Repository: spark Updated Branches: refs/heads/master cb1b9d34f -> 64132a14f [SPARK-16256][SQL][STREAMING] Added Structured Streaming Programming Guide Title defines all. Author: Tathagata Das Closes #13945 from tdas/SPARK-16256. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64132a14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64132a14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64132a14 Branch: refs/heads/master Commit: 64132a14fb7a7255feeb5847a54f541fe551bf23 Parents: cb1b9d3 Author: Tathagata Das Authored: Wed Jun 29 11:45:57 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 11:45:57 2016 -0700 -- docs/_layouts/global.html |1 + docs/img/structured-streaming-example-model.png | Bin 0 -> 125504 bytes docs/img/structured-streaming-late-data.png | Bin 0 -> 138931 bytes docs/img/structured-streaming-model.png | Bin 0 -> 66098 bytes .../structured-streaming-stream-as-a-table.png | Bin 0 -> 82251 bytes docs/img/structured-streaming-window.png| Bin 0 -> 128930 bytes docs/img/structured-streaming.pptx | Bin 0 -> 1105315 bytes docs/structured-streaming-programming-guide.md | 1156 ++ 8 files changed, 1157 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/_layouts/global.html -- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index d493f62..2d0c3fd 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -73,6 +73,7 @@ Spark Streaming DataFrames, Datasets and SQL +Structured Streaming MLlib (Machine Learning) GraphX (Graph Processing) SparkR (R on Spark) http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming-example-model.png -- diff --git a/docs/img/structured-streaming-example-model.png b/docs/img/structured-streaming-example-model.png new file mode 100644 index 000..af98765 Binary files /dev/null and b/docs/img/structured-streaming-example-model.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming-late-data.png -- diff --git a/docs/img/structured-streaming-late-data.png b/docs/img/structured-streaming-late-data.png new file mode 100644 index 000..5276b47 Binary files /dev/null and b/docs/img/structured-streaming-late-data.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming-model.png -- diff --git a/docs/img/structured-streaming-model.png b/docs/img/structured-streaming-model.png new file mode 100644 index 000..2061aae Binary files /dev/null and b/docs/img/structured-streaming-model.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming-stream-as-a-table.png -- diff --git a/docs/img/structured-streaming-stream-as-a-table.png b/docs/img/structured-streaming-stream-as-a-table.png new file mode 100644 index 000..8181216 Binary files /dev/null and b/docs/img/structured-streaming-stream-as-a-table.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming-window.png -- diff --git a/docs/img/structured-streaming-window.png b/docs/img/structured-streaming-window.png new file mode 100644 index 000..be9d3fb Binary files /dev/null and b/docs/img/structured-streaming-window.png differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/img/structured-streaming.pptx -- diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx new file mode 100644 index 000..c278323 Binary files /dev/null and b/docs/img/structured-streaming.pptx differ http://git-wip-us.apache.org/repos/asf/spark/blob/64132a14/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md new file mode 100644 index 000..9ed06be --- /dev/null +++ b/docs/structured-streaming-programming-guide.md @@ -0,0 +
spark git commit: [SPARK-14480][SQL] Remove meaningless StringIteratorReader for CSV data source.
Repository: spark Updated Branches: refs/heads/master 39f2eb1da -> cb1b9d34f [SPARK-14480][SQL] Remove meaningless StringIteratorReader for CSV data source. ## What changes were proposed in this pull request? This PR removes meaningless `StringIteratorReader` for CSV data source. In `CSVParser.scala`, there is an `Reader` wrapping `Iterator` but there are two problems by this. Firstly, it was actually not faster than processing line by line with Iterator due to additional logics to wrap `Iterator` to `Reader`. Secondly, this brought a bit of complexity because it needs additional logics to allow every line to be read bytes by bytes. So, it was pretty difficult to figure out issues about parsing, (eg. SPARK-14103). A benchmark was performed manually and the results were below: - Original codes with Reader wrapping Iterator |End-to-end (ns) | Parse Time (ns) | |---|| |14116265034 |2008277960| - New codes with Iterator |End-to-end (ns) | Parse Time (ns) | |---|| |13451699644 | 1549050564 | For the details for the environment, dataset and methods, please refer the JIRA ticket. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon Closes #13808 from HyukjinKwon/SPARK-14480-small. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb1b9d34 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb1b9d34 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb1b9d34 Branch: refs/heads/master Commit: cb1b9d34f37a5574de43f61e7036c4b8b81defbf Parents: 39f2eb1 Author: hyukjinkwon Authored: Wed Jun 29 11:42:51 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 29 11:42:51 2016 -0700 -- .../datasources/csv/CSVFileFormat.scala | 12 +- .../execution/datasources/csv/CSVParser.scala | 168 ++- .../execution/datasources/csv/CSVRelation.scala | 19 ++- .../datasources/csv/CSVParserSuite.scala| 125 -- 4 files changed, 33 insertions(+), 291 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb1b9d34/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 12e19f9..1bf5788 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -56,7 +56,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) val rdd = baseRdd(sparkSession, csvOptions, paths) val firstLine = findFirstLine(csvOptions, rdd) -val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) +val firstRow = new CsvReader(csvOptions).parseLine(firstLine) val header = if (csvOptions.headerFlag) { firstRow.zipWithIndex.map { case (value, index) => @@ -103,6 +103,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val csvOptions = new CSVOptions(options) +val commentPrefix = csvOptions.comment.toString val headers = requiredSchema.fields.map(_.name) val broadcastedHadoopConf = @@ -118,7 +119,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { CSVRelation.dropHeaderLine(file, lineIterator, csvOptions) - val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) + val csvParser = new CsvReader(csvOptions) + val tokenizedIterator = lineIterator.filter { line => +line.trim.nonEmpty && !line.startsWith(commentPrefix) + }.map { line => +csvParser.parseLine(line) + } val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) var numMalformedRecords = 0 tokenizedIterator.flatMap { recordTokens => @@ -146,7 +152,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val rdd = baseRdd(sparkSession, options, inputPaths) // Make sure firstLine is materialized before sending to executors val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null -CSVRelation.univocityTokenizer(rdd, header, firstLine, options) +CSVRelation.univocityTokenizer(rdd, firstLine, opti
spark git commit: [SPARK-16236][SQL][FOLLOWUP] Add Path Option back to Load API in DataFrameReader
Repository: spark Updated Branches: refs/heads/master 8c9cd0a7a -> 39f2eb1da [SPARK-16236][SQL][FOLLOWUP] Add Path Option back to Load API in DataFrameReader What changes were proposed in this pull request? In Python API, we have the same issue. Thanks for identifying this issue, zsxwing ! Below is an example: ```Python spark.read.format('json').load('python/test_support/sql/people.json') ``` How was this patch tested? Existing test cases cover the changes by this PR Author: gatorsmile Closes #13965 from gatorsmile/optionPaths. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39f2eb1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39f2eb1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39f2eb1d Branch: refs/heads/master Commit: 39f2eb1da34f26bf68c535c8e6b796d71a37a651 Parents: 8c9cd0a Author: gatorsmile Authored: Wed Jun 29 11:30:49 2016 -0700 Committer: Shixiong Zhu Committed: Wed Jun 29 11:30:49 2016 -0700 -- python/pyspark/sql/readwriter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39f2eb1d/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 10f307b..44bf744 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -143,7 +143,9 @@ class DataFrameReader(OptionUtils): if schema is not None: self.schema(schema) self.options(**options) -if path is not None: +if isinstance(path, basestring): +return self._df(self._jreader.load(path)) +elif path is not None: if type(path) != list: path = [path] return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16236][SQL][FOLLOWUP] Add Path Option back to Load API in DataFrameReader
Repository: spark Updated Branches: refs/heads/branch-2.0 1cde325e2 -> edd1905c0 [SPARK-16236][SQL][FOLLOWUP] Add Path Option back to Load API in DataFrameReader What changes were proposed in this pull request? In Python API, we have the same issue. Thanks for identifying this issue, zsxwing ! Below is an example: ```Python spark.read.format('json').load('python/test_support/sql/people.json') ``` How was this patch tested? Existing test cases cover the changes by this PR Author: gatorsmile Closes #13965 from gatorsmile/optionPaths. (cherry picked from commit 39f2eb1da34f26bf68c535c8e6b796d71a37a651) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edd1905c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edd1905c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edd1905c Branch: refs/heads/branch-2.0 Commit: edd1905c0fde69025cb6d8d8f15d13d6a6da0e3b Parents: 1cde325 Author: gatorsmile Authored: Wed Jun 29 11:30:49 2016 -0700 Committer: Shixiong Zhu Committed: Wed Jun 29 11:30:57 2016 -0700 -- python/pyspark/sql/readwriter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/edd1905c/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 10f307b..44bf744 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -143,7 +143,9 @@ class DataFrameReader(OptionUtils): if schema is not None: self.schema(schema) self.options(**options) -if path is not None: +if isinstance(path, basestring): +return self._df(self._jreader.load(path)) +elif path is not None: if type(path) != list: path = [path] return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16140][MLLIB][SPARKR][DOCS] Group k-means method in generated R doc
Repository: spark Updated Branches: refs/heads/branch-2.0 d96e8c2dd -> 1cde325e2 [SPARK-16140][MLLIB][SPARKR][DOCS] Group k-means method in generated R doc https://issues.apache.org/jira/browse/SPARK-16140 ## What changes were proposed in this pull request? Group the R doc of spark.kmeans, predict(KM), summary(KM), read/write.ml(KM) under Rd spark.kmeans. The example code was updated. ## How was this patch tested? Tested on my local machine And on my laptop `jekyll build` is failing to build API docs, so here I can only show you the html I manually generated from Rd files, with no CSS applied, but the doc content should be there. ![screenshotkmeans](https://cloud.githubusercontent.com/assets/3925641/16403203/c2c9ca1e-3ca7-11e6-9e29-f2164aee75fc.png) Author: Xin Ren Closes #13921 from keypointt/SPARK-16140. (cherry picked from commit 8c9cd0a7a719ce4286f77f35bb787e2b626a472e) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cde325e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cde325e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cde325e Branch: refs/heads/branch-2.0 Commit: 1cde325e29286a8c6631b0b32351994aad7db567 Parents: d96e8c2 Author: Xin Ren Authored: Wed Jun 29 11:25:00 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 11:25:07 2016 -0700 -- R/pkg/R/generics.R | 2 ++ R/pkg/R/mllib.R| 72 +++-- 2 files changed, 35 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1cde325e/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 27dfd67..0e4350f 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1247,6 +1247,7 @@ setGeneric("spark.glm", function(data, formula, ...) { standardGeneric("spark.gl #' @export setGeneric("glm") +#' predict #' @rdname predict #' @export setGeneric("predict", function(object, ...) { standardGeneric("predict") }) @@ -1271,6 +1272,7 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s #' @export setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") }) +#' write.ml #' @rdname write.ml #' @export setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") }) http://git-wip-us.apache.org/repos/asf/spark/blob/1cde325e/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 897a376..4fe7367 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -267,9 +267,10 @@ setMethod("summary", signature(object = "NaiveBayesModel"), return(list(apriori = apriori, tables = tables)) }) -#' Fit a k-means model +#' K-Means Clustering Model #' -#' Fit a k-means model, similarly to R's kmeans(). +#' Fits a k-means clustering model against a Spark DataFrame, similarly to R's kmeans(). +#' Users can print, make predictions on the produced model and save the model to the input path. #' #' @param data SparkDataFrame for training #' @param formula A symbolic description of the model to be fitted. Currently only a few formula @@ -278,14 +279,32 @@ setMethod("summary", signature(object = "NaiveBayesModel"), #' @param k Number of centers #' @param maxIter Maximum iteration number #' @param initMode The initialization algorithm choosen to fit the model -#' @return A fitted k-means model +#' @return \code{spark.kmeans} returns a fitted k-means model #' @rdname spark.kmeans +#' @name spark.kmeans #' @export #' @examples #' \dontrun{ -#' model <- spark.kmeans(data, ~ ., k = 4, initMode = "random") +#' sparkR.session() +#' data(iris) +#' df <- createDataFrame(iris) +#' model <- spark.kmeans(df, Sepal_Length ~ Sepal_Width, k = 4, initMode = "random") +#' summary(model) +#' +#' # fitted values on training data +#' fitted <- predict(model, df) +#' head(select(fitted, "Sepal_Length", "prediction")) +#' +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and print +#' savedModel <- read.ml(path) +#' summary(savedModel) #' } #' @note spark.kmeans since 2.0.0 +#' @seealso \link{predict}, \link{read.ml}, \link{write.ml} setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random")) { formula <- paste(deparse(formula), collapse = "") @@ -301,7 +320,7 @@ setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula" #' Note: A saved-loaded model does not support this method. #' #' @param obje
spark git commit: [SPARK-16140][MLLIB][SPARKR][DOCS] Group k-means method in generated R doc
Repository: spark Updated Branches: refs/heads/master c6a220d75 -> 8c9cd0a7a [SPARK-16140][MLLIB][SPARKR][DOCS] Group k-means method in generated R doc https://issues.apache.org/jira/browse/SPARK-16140 ## What changes were proposed in this pull request? Group the R doc of spark.kmeans, predict(KM), summary(KM), read/write.ml(KM) under Rd spark.kmeans. The example code was updated. ## How was this patch tested? Tested on my local machine And on my laptop `jekyll build` is failing to build API docs, so here I can only show you the html I manually generated from Rd files, with no CSS applied, but the doc content should be there. ![screenshotkmeans](https://cloud.githubusercontent.com/assets/3925641/16403203/c2c9ca1e-3ca7-11e6-9e29-f2164aee75fc.png) Author: Xin Ren Closes #13921 from keypointt/SPARK-16140. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c9cd0a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c9cd0a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c9cd0a7 Branch: refs/heads/master Commit: 8c9cd0a7a719ce4286f77f35bb787e2b626a472e Parents: c6a220d Author: Xin Ren Authored: Wed Jun 29 11:25:00 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 11:25:00 2016 -0700 -- R/pkg/R/generics.R | 2 ++ R/pkg/R/mllib.R| 72 +++-- 2 files changed, 35 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c9cd0a7/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 27dfd67..0e4350f 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1247,6 +1247,7 @@ setGeneric("spark.glm", function(data, formula, ...) { standardGeneric("spark.gl #' @export setGeneric("glm") +#' predict #' @rdname predict #' @export setGeneric("predict", function(object, ...) { standardGeneric("predict") }) @@ -1271,6 +1272,7 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s #' @export setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") }) +#' write.ml #' @rdname write.ml #' @export setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") }) http://git-wip-us.apache.org/repos/asf/spark/blob/8c9cd0a7/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 897a376..4fe7367 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -267,9 +267,10 @@ setMethod("summary", signature(object = "NaiveBayesModel"), return(list(apriori = apriori, tables = tables)) }) -#' Fit a k-means model +#' K-Means Clustering Model #' -#' Fit a k-means model, similarly to R's kmeans(). +#' Fits a k-means clustering model against a Spark DataFrame, similarly to R's kmeans(). +#' Users can print, make predictions on the produced model and save the model to the input path. #' #' @param data SparkDataFrame for training #' @param formula A symbolic description of the model to be fitted. Currently only a few formula @@ -278,14 +279,32 @@ setMethod("summary", signature(object = "NaiveBayesModel"), #' @param k Number of centers #' @param maxIter Maximum iteration number #' @param initMode The initialization algorithm choosen to fit the model -#' @return A fitted k-means model +#' @return \code{spark.kmeans} returns a fitted k-means model #' @rdname spark.kmeans +#' @name spark.kmeans #' @export #' @examples #' \dontrun{ -#' model <- spark.kmeans(data, ~ ., k = 4, initMode = "random") +#' sparkR.session() +#' data(iris) +#' df <- createDataFrame(iris) +#' model <- spark.kmeans(df, Sepal_Length ~ Sepal_Width, k = 4, initMode = "random") +#' summary(model) +#' +#' # fitted values on training data +#' fitted <- predict(model, df) +#' head(select(fitted, "Sepal_Length", "prediction")) +#' +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and print +#' savedModel <- read.ml(path) +#' summary(savedModel) #' } #' @note spark.kmeans since 2.0.0 +#' @seealso \link{predict}, \link{read.ml}, \link{write.ml} setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random")) { formula <- paste(deparse(formula), collapse = "") @@ -301,7 +320,7 @@ setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula" #' Note: A saved-loaded model does not support this method. #' #' @param object A fitted k-means model -#' @return SparkDataFrame containing fitted values +#' @return \code{fitted} retu
spark git commit: [MINOR][SPARKR] Fix arguments of survreg in SparkR
Repository: spark Updated Branches: refs/heads/branch-2.0 ba71cf451 -> d96e8c2dd [MINOR][SPARKR] Fix arguments of survreg in SparkR ## What changes were proposed in this pull request? Fix wrong arguments description of ```survreg``` in SparkR. ## How was this patch tested? ```Arguments``` section of ```survreg``` doc before this PR (with wrong description for ```path``` and missing ```overwrite```): ![image](https://cloud.githubusercontent.com/assets/1962026/16447548/fe7a5ed4-3da1-11e6-8b96-b5bf2083b07e.png) After this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/16447617/368e0b18-3da2-11e6-8277-45640fb11859.png) Author: Yanbo Liang Closes #13970 from yanboliang/spark-16143-followup. (cherry picked from commit c6a220d756f23ee89a0d1366b20259890c9d67c9) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d96e8c2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d96e8c2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d96e8c2d Branch: refs/heads/branch-2.0 Commit: d96e8c2dd0a9949751d3074b6ab61eee12f5d622 Parents: ba71cf4 Author: Yanbo Liang Authored: Wed Jun 29 11:20:35 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 11:20:41 2016 -0700 -- R/pkg/R/mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d96e8c2d/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 8e6c2dd..897a376 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -442,11 +442,11 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"), # Saves the AFT survival regression model to the input path. -#' @param path The directory where the model is savedist containing the model's coefficien +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE #' which means throw exception if the output path exists. #' #' @rdname spark.survreg -#' @name write.ml #' @export #' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0 #' @seealso \link{read.ml} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SPARKR] Fix arguments of survreg in SparkR
Repository: spark Updated Branches: refs/heads/master 272a2f78f -> c6a220d75 [MINOR][SPARKR] Fix arguments of survreg in SparkR ## What changes were proposed in this pull request? Fix wrong arguments description of ```survreg``` in SparkR. ## How was this patch tested? ```Arguments``` section of ```survreg``` doc before this PR (with wrong description for ```path``` and missing ```overwrite```): ![image](https://cloud.githubusercontent.com/assets/1962026/16447548/fe7a5ed4-3da1-11e6-8b96-b5bf2083b07e.png) After this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/16447617/368e0b18-3da2-11e6-8277-45640fb11859.png) Author: Yanbo Liang Closes #13970 from yanboliang/spark-16143-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6a220d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6a220d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6a220d7 Branch: refs/heads/master Commit: c6a220d756f23ee89a0d1366b20259890c9d67c9 Parents: 272a2f7 Author: Yanbo Liang Authored: Wed Jun 29 11:20:35 2016 -0700 Committer: Xiangrui Meng Committed: Wed Jun 29 11:20:35 2016 -0700 -- R/pkg/R/mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6a220d7/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 8e6c2dd..897a376 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -442,11 +442,11 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"), # Saves the AFT survival regression model to the input path. -#' @param path The directory where the model is savedist containing the model's coefficien +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE #' which means throw exception if the output path exists. #' #' @rdname spark.survreg -#' @name write.ml #' @export #' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0 #' @seealso \link{read.ml} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15990][YARN] Add rolling log aggregation support for Spark on yarn
Repository: spark Updated Branches: refs/heads/master 393db655c -> 272a2f78f [SPARK-15990][YARN] Add rolling log aggregation support for Spark on yarn ## What changes were proposed in this pull request? Yarn supports rolling log aggregation since 2.6, previously log will only be aggregated to HDFS after application is finished, it is quite painful for long running applications like Spark Streaming, thriftserver. Also out of disk problem will be occurred when log file is too large. So here propose to add support of rolling log aggregation for Spark on yarn. One limitation for this is that log4j should be set to change to file appender, now in Spark itself uses console appender by default, in which file will not be created again once removed after aggregation. But I think lots of production users should have changed their log4j configuration instead of default on, so this is not a big problem. ## How was this patch tested? Manually verified with Hadoop 2.7.1. Author: jerryshao Closes #13712 from jerryshao/SPARK-15990. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/272a2f78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/272a2f78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/272a2f78 Branch: refs/heads/master Commit: 272a2f78f3ff801b94a81fa8fcc6633190eaa2f4 Parents: 393db65 Author: jerryshao Authored: Wed Jun 29 08:17:27 2016 -0500 Committer: Tom Graves Committed: Wed Jun 29 08:17:27 2016 -0500 -- docs/running-on-yarn.md | 24 + .../org/apache/spark/deploy/yarn/Client.scala | 27 .../org/apache/spark/deploy/yarn/config.scala | 16 3 files changed, 67 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/272a2f78/docs/running-on-yarn.md -- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index dbd46cc..4e92042 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -472,6 +472,30 @@ To use a custom metrics.properties for the application master and executors, upd Currently supported services are: hive, hbase + + spark.yarn.rolledLog.includePattern + (none) + + Java Regex to filter the log files which match the defined include pattern + and those log files will be aggregated in a rolling fashion. + This will be used with YARN's rolling log aggregation, to enable this feature in YARN side + yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds should be + configured in yarn-site.xml. + This feature can only be used with Hadoop 2.6.1+. The Spark log4j appender needs be changed to use + FileAppender or another appender that can handle the files being removed while its running. Based + on the file name configured in the log4j configuration (like spark.log), the user should set the + regex (spark*) to include all the log files that need to be aggregated. + + + + spark.yarn.rolledLog.excludePattern + (none) + + Java Regex to filter the log files which match the defined exclude pattern + and those log files will not be aggregated in a rolling fashion. If the log file + name matches both the include and the exclude pattern, this file will be excluded eventually. + + # Important notes http://git-wip-us.apache.org/repos/asf/spark/blob/272a2f78/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9bb3695..d63579f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -271,6 +271,33 @@ private[spark] class Client( appContext.setResource(capability) } +sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => + try { +val logAggregationContext = Records.newRecord( + Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) + .asInstanceOf[Object] + +val setRolledLogsIncludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) +setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) + +sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + val setRolledLogsExcludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) + setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) +} + +val setLo
spark git commit: [SPARK-15858][ML] Fix calculating error by tree stack over flow prob…
Repository: spark Updated Branches: refs/heads/master 21385d02a -> 393db655c [SPARK-15858][ML] Fix calculating error by tree stack over flow prob⦠## What changes were proposed in this pull request? What changes were proposed in this pull request? Improving evaluateEachIteration function in mllib as it fails when trying to calculate error by tree for a model that has more than 500 trees ## How was this patch tested? the batch tested on productions data set (2K rows x 2K features) training a gradient boosted model without validation with 1000 maxIteration settings, then trying to produce the error by tree, the new patch was able to perform the calculation within 30 seconds, while previously it was take hours then fail. **PS**: It would be better if this PR can be cherry picked into release branches 1.6.1 and 2.0 Author: Mahmoud Rawas Author: Mahmoud Rawas Closes #13624 from mhmoudr/SPARK-15858.master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/393db655 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/393db655 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/393db655 Branch: refs/heads/master Commit: 393db655c3c43155305fbba1b2f8c48a95f18d93 Parents: 21385d0 Author: Mahmoud Rawas Authored: Wed Jun 29 13:12:17 2016 +0100 Committer: Sean Owen Committed: Wed Jun 29 13:12:17 2016 +0100 -- .../ml/tree/impl/GradientBoostedTrees.scala | 40 ++-- .../mllib/tree/model/treeEnsembleModels.scala | 37 -- 2 files changed, 34 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/393db655/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala index a0faff2..7bef899 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala @@ -205,31 +205,29 @@ private[spark] object GradientBoostedTrees extends Logging { case _ => data } -val numIterations = trees.length -val evaluationArray = Array.fill(numIterations)(0.0) -val localTreeWeights = treeWeights - -var predictionAndError = computeInitialPredictionAndError( - remappedData, localTreeWeights(0), trees(0), loss) - -evaluationArray(0) = predictionAndError.values.mean() - val broadcastTrees = sc.broadcast(trees) -(1 until numIterations).foreach { nTree => - predictionAndError = remappedData.zip(predictionAndError).mapPartitions { iter => -val currentTree = broadcastTrees.value(nTree) -val currentTreeWeight = localTreeWeights(nTree) -iter.map { case (point, (pred, error)) => - val newPred = updatePrediction(point.features, pred, currentTree, currentTreeWeight) - val newError = loss.computeError(newPred, point.label) - (newPred, newError) -} +val localTreeWeights = treeWeights +val treesIndices = trees.indices + +val dataCount = remappedData.count() +val evaluation = remappedData.map { point => + treesIndices.map { idx => +val prediction = broadcastTrees.value(idx) + .rootNode + .predictImpl(point.features) + .prediction +prediction * localTreeWeights(idx) } - evaluationArray(nTree) = predictionAndError.values.mean() + .scanLeft(0.0)(_ + _).drop(1) + .map(prediction => loss.computeError(prediction, point.label)) } +.aggregate(treesIndices.map(_ => 0.0))( + (aggregated, row) => treesIndices.map(idx => aggregated(idx) + row(idx)), + (a, b) => treesIndices.map(idx => a(idx) + b(idx))) +.map(_ / dataCount) -broadcastTrees.unpersist() -evaluationArray +broadcastTrees.destroy() +evaluation.toArray } /** http://git-wip-us.apache.org/repos/asf/spark/blob/393db655/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index f7d9b22..657ed0a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -151,31 +151,24 @@ class GradientBoostedTreesModel @Since("1.2.0") ( case _ => data } -val numIterations = trees.length -val evaluationArray =
spark git commit: [SPARK-16261][EXAMPLES][ML] Fixed incorrect appNames in ML Examples
Repository: spark Updated Branches: refs/heads/branch-2.0 1b4d63f6f -> ba71cf451 [SPARK-16261][EXAMPLES][ML] Fixed incorrect appNames in ML Examples ## What changes were proposed in this pull request? Some appNames in ML examples are incorrect, mostly in PySpark but one in Scala. This corrects the names. ## How was this patch tested? Style, local tests Author: Bryan Cutler Closes #13949 from BryanCutler/pyspark-example-appNames-fix-SPARK-16261. (cherry picked from commit 21385d02a987bcee1198103e447c019f7a769d68) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba71cf45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba71cf45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba71cf45 Branch: refs/heads/branch-2.0 Commit: ba71cf451efceaa6b454baa51c7a6b7e184d3cb7 Parents: 1b4d63f Author: Bryan Cutler Authored: Wed Jun 29 14:06:38 2016 +0200 Committer: Nick Pentreath Committed: Wed Jun 29 14:06:52 2016 +0200 -- examples/src/main/python/ml/decision_tree_regression_example.py| 2 +- examples/src/main/python/ml/lda_example.py | 2 +- examples/src/main/python/ml/simple_params_example.py | 2 +- .../org/apache/spark/examples/ml/CountVectorizerExample.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba71cf45/examples/src/main/python/ml/decision_tree_regression_example.py -- diff --git a/examples/src/main/python/ml/decision_tree_regression_example.py b/examples/src/main/python/ml/decision_tree_regression_example.py index b734d49..58d7ad9 100644 --- a/examples/src/main/python/ml/decision_tree_regression_example.py +++ b/examples/src/main/python/ml/decision_tree_regression_example.py @@ -31,7 +31,7 @@ from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ -.appName("decision_tree_classification_example")\ +.appName("DecisionTreeRegressionExample")\ .getOrCreate() # $example on$ http://git-wip-us.apache.org/repos/asf/spark/blob/ba71cf45/examples/src/main/python/ml/lda_example.py -- diff --git a/examples/src/main/python/ml/lda_example.py b/examples/src/main/python/ml/lda_example.py index 6ca56ad..5ce810f 100644 --- a/examples/src/main/python/ml/lda_example.py +++ b/examples/src/main/python/ml/lda_example.py @@ -35,7 +35,7 @@ if __name__ == "__main__": # Creates a SparkSession spark = SparkSession \ .builder \ -.appName("PythonKMeansExample") \ +.appName("LDAExample") \ .getOrCreate() # $example on$ http://git-wip-us.apache.org/repos/asf/spark/blob/ba71cf45/examples/src/main/python/ml/simple_params_example.py -- diff --git a/examples/src/main/python/ml/simple_params_example.py b/examples/src/main/python/ml/simple_params_example.py index 54fbc2c..2f1eaa6 100644 --- a/examples/src/main/python/ml/simple_params_example.py +++ b/examples/src/main/python/ml/simple_params_example.py @@ -33,7 +33,7 @@ Run with: if __name__ == "__main__": spark = SparkSession \ .builder \ -.appName("SimpleTextClassificationPipeline") \ +.appName("SimpleParamsExample") \ .getOrCreate() # prepare training data. http://git-wip-us.apache.org/repos/asf/spark/blob/ba71cf45/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala index 51aa517..988d894 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala @@ -27,7 +27,7 @@ object CountVectorizerExample { def main(args: Array[String]) { val spark = SparkSession .builder - .appName("CounterVectorizerExample") + .appName("CountVectorizerExample") .getOrCreate() // $example on$ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16261][EXAMPLES][ML] Fixed incorrect appNames in ML Examples
Repository: spark Updated Branches: refs/heads/master 7ee9e39cb -> 21385d02a [SPARK-16261][EXAMPLES][ML] Fixed incorrect appNames in ML Examples ## What changes were proposed in this pull request? Some appNames in ML examples are incorrect, mostly in PySpark but one in Scala. This corrects the names. ## How was this patch tested? Style, local tests Author: Bryan Cutler Closes #13949 from BryanCutler/pyspark-example-appNames-fix-SPARK-16261. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21385d02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21385d02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21385d02 Branch: refs/heads/master Commit: 21385d02a987bcee1198103e447c019f7a769d68 Parents: 7ee9e39 Author: Bryan Cutler Authored: Wed Jun 29 14:06:38 2016 +0200 Committer: Nick Pentreath Committed: Wed Jun 29 14:06:38 2016 +0200 -- examples/src/main/python/ml/decision_tree_regression_example.py| 2 +- examples/src/main/python/ml/lda_example.py | 2 +- examples/src/main/python/ml/simple_params_example.py | 2 +- .../org/apache/spark/examples/ml/CountVectorizerExample.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21385d02/examples/src/main/python/ml/decision_tree_regression_example.py -- diff --git a/examples/src/main/python/ml/decision_tree_regression_example.py b/examples/src/main/python/ml/decision_tree_regression_example.py index b734d49..58d7ad9 100644 --- a/examples/src/main/python/ml/decision_tree_regression_example.py +++ b/examples/src/main/python/ml/decision_tree_regression_example.py @@ -31,7 +31,7 @@ from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ -.appName("decision_tree_classification_example")\ +.appName("DecisionTreeRegressionExample")\ .getOrCreate() # $example on$ http://git-wip-us.apache.org/repos/asf/spark/blob/21385d02/examples/src/main/python/ml/lda_example.py -- diff --git a/examples/src/main/python/ml/lda_example.py b/examples/src/main/python/ml/lda_example.py index 6ca56ad..5ce810f 100644 --- a/examples/src/main/python/ml/lda_example.py +++ b/examples/src/main/python/ml/lda_example.py @@ -35,7 +35,7 @@ if __name__ == "__main__": # Creates a SparkSession spark = SparkSession \ .builder \ -.appName("PythonKMeansExample") \ +.appName("LDAExample") \ .getOrCreate() # $example on$ http://git-wip-us.apache.org/repos/asf/spark/blob/21385d02/examples/src/main/python/ml/simple_params_example.py -- diff --git a/examples/src/main/python/ml/simple_params_example.py b/examples/src/main/python/ml/simple_params_example.py index 54fbc2c..2f1eaa6 100644 --- a/examples/src/main/python/ml/simple_params_example.py +++ b/examples/src/main/python/ml/simple_params_example.py @@ -33,7 +33,7 @@ Run with: if __name__ == "__main__": spark = SparkSession \ .builder \ -.appName("SimpleTextClassificationPipeline") \ +.appName("SimpleParamsExample") \ .getOrCreate() # prepare training data. http://git-wip-us.apache.org/repos/asf/spark/blob/21385d02/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala index 51aa517..988d894 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala @@ -27,7 +27,7 @@ object CountVectorizerExample { def main(args: Array[String]) { val spark = SparkSession .builder - .appName("CounterVectorizerExample") + .appName("CountVectorizerExample") .getOrCreate() // $example on$ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16157][SQL] Add New Methods for comments in StructField and StructType
Repository: spark Updated Branches: refs/heads/master d1e810885 -> 7ee9e39cb [SPARK-16157][SQL] Add New Methods for comments in StructField and StructType What changes were proposed in this pull request? Based on the previous discussion with cloud-fan hvanhovell in another related PR https://github.com/apache/spark/pull/13764#discussion_r67994276, it looks reasonable to add convenience methods for users to add `comment` when defining `StructField`. Currently, the column-related `comment` attribute is stored in `Metadata` of `StructField`. For example, users can add the `comment` attribute using the following way: ```Scala StructType( StructField( "cl1", IntegerType, nullable = false, new MetadataBuilder().putString("comment", "test").build()) :: Nil) ``` This PR is to add more user friendly methods for the `comment` attribute when defining a `StructField`. After the changes, users are provided three different ways to do it: ```Scala val struct = (new StructType) .add("a", "int", true, "test1") val struct = (new StructType) .add("c", StringType, true, "test3") val struct = (new StructType) .add(StructField("d", StringType).withComment("test4")) ``` How was this patch tested? Added test cases: - `DataTypeSuite` is for testing three types of API changes, - `DataFrameReaderWriterSuite` is for parquet, json and csv formats - using in-memory catalog - `OrcQuerySuite.scala` is for orc format using Hive-metastore Author: gatorsmile Closes #13860 from gatorsmile/newMethodForComment. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ee9e39c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ee9e39c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ee9e39c Branch: refs/heads/master Commit: 7ee9e39cb43c43d69dfe8035106f7556886e60b1 Parents: d1e8108 Author: gatorsmile Authored: Wed Jun 29 19:36:21 2016 +0800 Committer: Wenchen Fan Committed: Wed Jun 29 19:36:21 2016 +0800 -- .../spark/sql/catalyst/parser/AstBuilder.scala | 10 ++ .../apache/spark/sql/types/StructField.scala| 18 ++ .../org/apache/spark/sql/types/StructType.scala | 35 .../apache/spark/sql/types/DataTypeSuite.scala | 17 ++ .../spark/sql/execution/command/tables.scala| 3 +- .../sql/execution/command/DDLCommandSuite.scala | 3 +- .../apache/spark/sql/sources/DDLTestSuite.scala | 3 +- .../spark/sql/sources/TableScanSuite.scala | 8 ++--- .../sql/test/DataFrameReaderWriterSuite.scala | 27 ++- .../spark/sql/hive/orc/OrcQuerySuite.scala | 22 10 files changed, 125 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c7420a1..f2cc8d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1430,13 +1430,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) { import ctx._ - -// Add the comment to the metadata. -val builder = new MetadataBuilder -if (STRING != null) { - builder.putString("comment", string(STRING)) -} - -StructField(identifier.getText, typedVisit(dataType), nullable = true, builder.build()) +val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) +if (STRING == null) structField else structField.withComment(string(STRING)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala index 83570a5..cb8bf61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -51,4 +51,22 @@ case class StructField( ("nullable" -> nullable) ~ ("metadata" -> metadata.jsonValue) } + + /** + * Updates the StructField with a new comment value. + */ + def withComment(comment: String): StructField = { +val newMetadata = new MetadataBuilder() + .withMetadata(metadata) +
spark git commit: [SPARK-16291][SQL] CheckAnalysis should capture nested aggregate functions that reference no input attributes
Repository: spark Updated Branches: refs/heads/branch-2.0 904122335 -> 1b4d63f6f [SPARK-16291][SQL] CheckAnalysis should capture nested aggregate functions that reference no input attributes ## What changes were proposed in this pull request? `MAX(COUNT(*))` is invalid since aggregate expression can't be nested within another aggregate expression. This case should be captured at analysis phase, but somehow sneaks off to runtime. The reason is that when checking aggregate expressions in `CheckAnalysis`, a checking branch treats all expressions that reference no input attributes as valid ones. However, `MAX(COUNT(*))` is translated into `MAX(COUNT(1))` at analysis phase and also references no input attribute. This PR fixes this issue by removing the aforementioned branch. ## How was this patch tested? New test case added in `AnalysisErrorSuite`. Author: Cheng Lian Closes #13968 from liancheng/spark-16291-nested-agg-functions. (cherry picked from commit d1e8108854deba3de8e2d87eb4389d11fb17ee57) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b4d63f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b4d63f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b4d63f6 Branch: refs/heads/branch-2.0 Commit: 1b4d63f6f1e9f5aa819a149e1f5e45bba7d865bb Parents: 9041223 Author: Cheng Lian Authored: Wed Jun 29 19:08:36 2016 +0800 Committer: Wenchen Fan Committed: Wed Jun 29 19:09:00 2016 +0800 -- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 1 - .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 12 +++- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 +--- 3 files changed, 12 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b4d63f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ac9693e..7b30fcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -206,7 +206,6 @@ trait CheckAnalysis extends PredicateHelper { "Add to group by or wrap in first() (or first_value) if you don't care " + "which value you get.") case e if groupingExprs.exists(_.semanticEquals(e)) => // OK - case e if e.references.isEmpty => // OK case e => e.children.foreach(checkValidAggregateExpression) } http://git-wip-us.apache.org/repos/asf/spark/blob/1b4d63f6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index a41383f..a9cde1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max} import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} @@ -163,6 +163,16 @@ class AnalysisErrorSuite extends AnalysisTest { "Distinct window functions are not supported" :: Nil) errorTest( +"nested aggregate functions", +testRelation.groupBy('a)( + AggregateExpression( +Max(AggregateExpression(Count(Literal(1)), Complete, isDistinct = false)), +Complete, +isDistinct = false)), +"not allowed to use an aggregate function in the argument of another aggregate function." :: Nil + ) + + errorTest( "offset window function", testRelation2.select( WindowExpression( http://git-wip-us.apache.org/repos/asf/spark/blob/1b4d63f6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite
spark git commit: [SPARK-16291][SQL] CheckAnalysis should capture nested aggregate functions that reference no input attributes
Repository: spark Updated Branches: refs/heads/master 757dc2c09 -> d1e810885 [SPARK-16291][SQL] CheckAnalysis should capture nested aggregate functions that reference no input attributes ## What changes were proposed in this pull request? `MAX(COUNT(*))` is invalid since aggregate expression can't be nested within another aggregate expression. This case should be captured at analysis phase, but somehow sneaks off to runtime. The reason is that when checking aggregate expressions in `CheckAnalysis`, a checking branch treats all expressions that reference no input attributes as valid ones. However, `MAX(COUNT(*))` is translated into `MAX(COUNT(1))` at analysis phase and also references no input attribute. This PR fixes this issue by removing the aforementioned branch. ## How was this patch tested? New test case added in `AnalysisErrorSuite`. Author: Cheng Lian Closes #13968 from liancheng/spark-16291-nested-agg-functions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1e81088 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1e81088 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1e81088 Branch: refs/heads/master Commit: d1e8108854deba3de8e2d87eb4389d11fb17ee57 Parents: 757dc2c Author: Cheng Lian Authored: Wed Jun 29 19:08:36 2016 +0800 Committer: Wenchen Fan Committed: Wed Jun 29 19:08:36 2016 +0800 -- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 1 - .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 12 +++- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 +--- 3 files changed, 12 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d1e81088/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ac9693e..7b30fcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -206,7 +206,6 @@ trait CheckAnalysis extends PredicateHelper { "Add to group by or wrap in first() (or first_value) if you don't care " + "which value you get.") case e if groupingExprs.exists(_.semanticEquals(e)) => // OK - case e if e.references.isEmpty => // OK case e => e.children.foreach(checkValidAggregateExpression) } http://git-wip-us.apache.org/repos/asf/spark/blob/d1e81088/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index a41383f..a9cde1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max} import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} @@ -163,6 +163,16 @@ class AnalysisErrorSuite extends AnalysisTest { "Distinct window functions are not supported" :: Nil) errorTest( +"nested aggregate functions", +testRelation.groupBy('a)( + AggregateExpression( +Max(AggregateExpression(Count(Literal(1)), Complete, isDistinct = false)), +Complete, +isDistinct = false)), +"not allowed to use an aggregate function in the argument of another aggregate function." :: Nil + ) + + errorTest( "offset window function", testRelation2.select( WindowExpression( http://git-wip-us.apache.org/repos/asf/spark/blob/d1e81088/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/te
spark git commit: [TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Javadoc is incorrect for toJavaRDD, …
Repository: spark Updated Branches: refs/heads/master f454a7f9f -> 757dc2c09 [TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Javadoc is incorrect for toJavaRDD, ⦠## What changes were proposed in this pull request? Change the return type mentioned in the JavaDoc for `toJavaRDD` / `javaRDD` to match the actual return type & be consistent with the scala rdd return type. ## How was this patch tested? Docs only change. Author: Holden Karau Closes #13954 from holdenk/trivial-streaming-tojavardd-doc-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/757dc2c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/757dc2c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/757dc2c0 Branch: refs/heads/master Commit: 757dc2c09d23400dacac22e51f52062bbe471136 Parents: f454a7f Author: Holden Karau Authored: Wed Jun 29 01:52:20 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 01:52:20 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/757dc2c0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index df9f188..a6581eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2383,14 +2383,14 @@ class Dataset[T] private[sql]( } /** - * Returns the content of the Dataset as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the Dataset as a [[JavaRDD]] of [[T]]s. * @group basic * @since 1.6.0 */ def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD() /** - * Returns the content of the Dataset as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the Dataset as a [[JavaRDD]] of [[T]]s. * @group basic * @since 1.6.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Javadoc is incorrect for toJavaRDD, …
Repository: spark Updated Branches: refs/heads/branch-2.0 6650c0533 -> 904122335 [TRIVIAL][DOCS][STREAMING][SQL] The return type mentioned in the Javadoc is incorrect for toJavaRDD, ⦠## What changes were proposed in this pull request? Change the return type mentioned in the JavaDoc for `toJavaRDD` / `javaRDD` to match the actual return type & be consistent with the scala rdd return type. ## How was this patch tested? Docs only change. Author: Holden Karau Closes #13954 from holdenk/trivial-streaming-tojavardd-doc-fix. (cherry picked from commit 757dc2c09d23400dacac22e51f52062bbe471136) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90412233 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90412233 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90412233 Branch: refs/heads/branch-2.0 Commit: 904122335d94681be2afbaf4f41a50d468e707b9 Parents: 6650c05 Author: Holden Karau Authored: Wed Jun 29 01:52:20 2016 -0700 Committer: Tathagata Das Committed: Wed Jun 29 01:52:33 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90412233/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 153af74..067cbec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2350,14 +2350,14 @@ class Dataset[T] private[sql]( } /** - * Returns the content of the Dataset as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the Dataset as a [[JavaRDD]] of [[T]]s. * @group basic * @since 1.6.0 */ def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD() /** - * Returns the content of the Dataset as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the Dataset as a [[JavaRDD]] of [[T]]s. * @group basic * @since 1.6.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org