[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21121 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21121 **[Test build #89686 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89686/testReport)** for PR 21121 at commit [`06348c3`](https://github.com/apache/spark/commit/06348c3cf53d3631fd620b9e4d46b408a30b341e). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21121 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89686/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20959: [SPARK-23846][SQL] The samplingRatio option for C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20959#discussion_r183226807 --- Diff: python/pyspark/sql/readwriter.py --- @@ -882,6 +882,9 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. +:param samplingRatio: defines fraction of rows (when ``multiLine`` is ``false``) or fraction + of files (when ``multiLine`` is ``true``) used for schema inferring. --- End diff -- @MaxGekk, I think in CSV's case we shouldn't do sample based on the files .. JSON's case is a record per file whereas CSV allows multiple lines in a file ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799][SQL] FilterEstimation.evaluateInSet produc...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21052 Thanks! Merged to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21031 @kiszk Could you also update the PR description? LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21052: [SPARK-23799][SQL] FilterEstimation.evaluateInSet...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21052 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20937 I think we are almost there - sorry for late response. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [DO-NOT-MERGE][WIP] Explicitly print out skipped tests f...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21107 @BryanCutler, I checked this but looks still printing out duplicated logs .. however, I think in this way I could deal with https://github.com/apache/spark/pull/21107#issuecomment-382914524. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2564/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21107 Will remove WIP after few more checking in my local and Jenkins's output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89687/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][PYTHON] Explicitly print out skipped tests...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][PYTHON] Explicitly print out skipped tests...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21107 **[Test build #89688 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89688/testReport)** for PR 21107 at commit [`3dd74a0`](https://github.com/apache/spark/commit/3dd74a00c293030df57b126163de1cda47a2b7fe). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][PYTHON] Explicitly print out skipped tests...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89688/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,11 +187,15 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + encoding: Option[String]): JsonParser = { val path = new Path(record.getPath()) -CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) +val is = CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path) + +encoding.map(enc => CreateJacksonParser.inputStream(enc, jsonFactory, is)) + .getOrElse(CreateJacksonParser.inputStream(jsonFactory, is)) --- End diff -- Hm, @MaxGekk, wouldn't this also do a record per operation too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227330 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("No encoding", 3) { _ => +spark.read.json(path.getAbsolutePath) + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .json(path.getAbsolutePath) + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON schema inferring: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 38902 / 39282 2.6 389.0 1.0X + UTF-8 is set56959 / 57261 1.8 569.6 0.7X + */ + benchmark.run() +} + } + + def perlineParsing(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON per-line parsing", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write.json(path.getAbsolutePath) + val schema = new StructType().add("fieldA", StringType) + + benchmark.addCase("No encoding", 3) { _ => +spark.read + .schema(schema) + .json(path.getAbsolutePath) + .count() + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .schema(schema) + .json(path.getAbsolutePath) + .count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON per-line parsing: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 25947 / 26188 3.9 259.5 1.0X + UTF-8 is set46319 / 46417 2.2 463.2 0.6X + */ + benchmark.run() +} + } + + def
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -41,19 +41,25 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => -require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") -sep + val encoding: Option[String] = parameters.get(ENCODING) + + val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { lineSep => +require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") + +lineSep } + // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => +lineSep.getBytes(encoding.getOrElse("UTF-8")) + } val lineSeparatorInWrite: Array[Byte] = -lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) +lineSeparatorInRead.getOrElse("\n".getBytes("UTF-8")) --- End diff -- not a big deal at all but was just wondering if there was a reason to choose `"UTF-8"` over `StandardCharsets.UTF_8` because I was thinking `StandardCharsets.UTF_8` is slightly better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21121 **[Test build #89686 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89686/testReport)** for PR 21121 at commit [`06348c3`](https://github.com/apache/spark/commit/06348c3cf53d3631fd620b9e4d46b408a30b341e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21107 **[Test build #89687 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89687/testReport)** for PR 21107 at commit [`56b9001`](https://github.com/apache/spark/commit/56b9001362ce18908689d679a8fdb4e554460cd0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21107: [SPARK-24044][WIP] Explicitly print out skipped t...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21107#discussion_r183230073 --- Diff: python/pyspark/ml/tests.py --- @@ -2136,17 +2136,23 @@ class ImageReaderTest2(PySparkTestCase): @classmethod def setUpClass(cls): super(ImageReaderTest2, cls).setUpClass() +cls.hive_available = True # Note that here we enable Hive's support. cls.spark = None try: cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: cls.tearDownClass() -raise unittest.SkipTest("Hive is not available") +cls.hive_available = False except TypeError: cls.tearDownClass() -raise unittest.SkipTest("Hive is not available") -cls.spark = HiveContext._createForTesting(cls.sc) +cls.hive_available = False +if cls.hive_available: +cls.spark = HiveContext._createForTesting(cls.sc) + +def setUp(self): +if not self.hive_available: +self.skipTest("Hive is not available.") --- End diff -- ``` Finished test(python3): pyspark.sql.tests (51s) ... 93 tests were skipped ... Skipped tests in pyspark.sql.tests with python3: test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.' ... test_collect_functions (pyspark.sql.tests.HiveContextSQLTests) ... skipped 'Hive is not available.' test_datetime_functions (pyspark.sql.tests.HiveContextSQLTests) ... skipped 'Hive is not available.' ... test_query_execution_listener_on_collect (pyspark.sql.tests.QueryExecutionListenerTests) ... skipped "'org.apache.spark.sql.TestQueryExecutionListener' is not available. Will skip the related tests." ... ``` @viirya, @bersprockets and @BryanCutler, these were the output from my partial testing in my local. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21107: [SPARK-24044][WIP] Explicitly print out skipped t...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21107#discussion_r183230184 --- Diff: python/run-tests.py --- @@ -109,8 +111,34 @@ def run_individual_python_test(test_name, pyspark_python): # this code is invoked from a thread other than the main thread. os._exit(-1) else: -per_test_output.close() -LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) +skipped_counts = 0 +try: +per_test_output.seek(0) +# Here expects skipped test output from unittest when verbosity level is +# 2 (or --verbose option is enabled). +decoded_lines = map(lambda line: line.decode(), iter(per_test_output)) +skipped_tests = list(filter( +lambda line: re.search('test_.* \(pyspark\..*\) ... skipped ', line), +decoded_lines)) +skipped_counts = len(skipped_tests) +if skipped_counts > 0: +key = (pyspark_python, test_name) +SKIPPED_TESTS[key] = skipped_tests +per_test_output.close() +except: +import traceback +print_red("\nGot an exception while trying to store " + "skipped test output:\n%s" % traceback.format_exc()) +# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if +# this code is invoked from a thread other than the main thread. +os._exit(-1) +if skipped_counts != 0: +LOGGER.info( +"Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name, --- End diff -- Not sure if there's a better format. let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21107 **[Test build #89688 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89688/testReport)** for PR 21107 at commit [`3dd74a0`](https://github.com/apache/spark/commit/3dd74a00c293030df57b126163de1cda47a2b7fe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2565/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21107 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21107: [SPARK-24044][WIP] Explicitly print out skipped tests fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21107 **[Test build #89687 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89687/testReport)** for PR 21107 at commit [`56b9001`](https://github.com/apache/spark/commit/56b9001362ce18908689d679a8fdb4e554460cd0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21056: [SPARK-23849][SQL] Tests for samplingRatio of jso...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21056#discussion_r183227100 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2128,38 +2128,77 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-23849: schema inferring touches less data if samplingRation < 1.0") { -val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, + val sampledTestData = (value: java.lang.Long) => { +val predefinedSample = Set[Long](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, 57, 62, 68, 72) -withTempPath { path => - val writer = Files.newBufferedWriter(Paths.get(path.getAbsolutePath), -StandardCharsets.UTF_8, StandardOpenOption.CREATE_NEW) - for (i <- 0 until 100) { -if (predefinedSample.contains(i)) { - writer.write(s"""{"f1":${i.toString}}""" + "\n") -} else { - writer.write(s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n") -} - } - writer.close() - - val ds = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath) - assert(ds.schema == new StructType().add("f1", LongType)) +if (predefinedSample.contains(value)) { + s"""{"f1":${value.toString}}""" +} else { + s"""{"f1":${(value.toDouble + 0.1).toString}}""" } } - test("SPARK-23849: usage of samplingRation while parsing of dataset of strings") { -val dstr = spark.sparkContext.parallelize(0 until 100, 1).map { i => - val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, -57, 62, 68, 72) - if (predefinedSample.contains(i)) { -s"""{"f1":${i.toString}}""" + "\n" - } else { -s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n" - } -}.toDS() -val ds = spark.read.option("samplingRatio", 0.1).json(dstr) + test("SPARK-23849: schema inferring touches less data if samplingRatio < 1.0") { +// Set default values for the DataSource parameters to make sure +// that whole test file is mapped to only one partition. This will guarantee +// reliable sampling of the input file. +withSQLConf( + "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString, + "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString +)(withTempPath { path => + val ds = spark.range(0, 100, 1, 1).map(sampledTestData) + ds.write.text(path.getAbsolutePath) + val readback = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath) + + assert(readback.schema == new StructType().add("f1", LongType)) +}) + } + + test("SPARK-23849: usage of samplingRatio while parsing a dataset of strings") { +val ds = spark.range(0, 100, 1, 1).map(sampledTestData) +val readback = spark.read.option("samplingRatio", 0.1).json(ds) + +assert(readback.schema == new StructType().add("f1", LongType)) + } + + test("SPARK-23849: samplingRatio is out of the range (0, 1.0]") { +val ds = spark.range(0, 100, 1, 1).map(_.toString) -assert(ds.schema == new StructType().add("f1", LongType)) +val errorMsg0 = intercept[IllegalArgumentException] { + spark.read.option("samplingRatio", -1).json(ds) +}.getMessage +assert(errorMsg0.contains("samplingRatio (-1.0) should be greater than 0")) + +val errorMsg1 = intercept[IllegalArgumentException] { + spark.read.option("samplingRatio", 0).json(ds) +}.getMessage +assert(errorMsg1.contains("samplingRatio (0.0) should be greater than 0")) + +val sampled = spark.read.option("samplingRatio", 10).json(ds) +assert(sampled.count() == ds.count()) + } + + test("SPARK-23849: sampling files for schema inferring in the multiLine mode") { +withTempDir { dir => + Files.write(Paths.get(dir.getAbsolutePath, "0.json"), """{"a":"a"}""".getBytes, --- End diff -- maybe `getBytes` with explicit encoding UTF-8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21056: [SPARK-23849][SQL] Tests for samplingRatio of jso...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21056#discussion_r183227071 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2128,38 +2128,77 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-23849: schema inferring touches less data if samplingRation < 1.0") { -val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, + val sampledTestData = (value: java.lang.Long) => { --- End diff -- @MaxGekk, can we have the data in `TestJsonData`, for example, ```scala def sampledTestData: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize( ... )(Encoders.STRING) ``` and use it, for example, `sampledTestData.coalesce(1)` in `JsonSuite`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20940 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20940 **[Test build #89685 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport)** for PR 20940 at commit [`ae8a388`](https://github.com/apache/spark/commit/ae8a388405d8d3402b5b6a45a7c7855d90538edb). * This patch **fails due to an unknown error code, -9**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20940 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user gengliangwang commented on the issue: https://github.com/apache/spark/pull/20933 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21031 Sure, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20933 **[Test build #89690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89690/testReport)** for PR 20933 at commit [`359f846`](https://github.com/apache/spark/commit/359f846112ba8c7ee9023b7754da4a907068b39b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2567/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21123 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21123 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89689/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21123 **[Test build #89689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89689/testReport)** for PR 21123 at commit [`95628e5`](https://github.com/apache/spark/commit/95628e5a027d029be7dcc4e8e979555bc5e0e4a3). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] ` * `trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21056 **[Test build #89692 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)** for PR 21056 at commit [`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253723 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- I like your suggestion. So instead of throwing the exception, the function will execute a similar piece of code as in `genCodeForNonPrimitiveElements`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89691/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89690/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)** for PR 20937 at commit [`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21082 **[Test build #89693 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)** for PR 21082 at commit [`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", --- End diff -- That's really good question! The newly added functions `element_at` and `array_position` are 1-based. But on the other handed, the `getItem` from the `Column` class is 0-based. What about adding one extra parameter and let users decide whether the array will indexed from 0 or 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2568/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/20940 Could a committer please request a retest? It looks like the tests passed (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport/), and the failure occurs after posting to github: Attempting to post to Github... [error] running /home/jenkins/workspace/SparkPullRequestBuilder/build/sbt -Phadoop-2.6 -Pkubernetes -Pflume -Phive-thriftserver -Pyarn -Pkafka-0-8 -Phive -Pkinesis-asl -Pmesos test ; process was terminated by signal 9 > Post successful. Build step 'Execute shell' marked build as failure Archiving artifacts Recording test results Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/ Test FAILed. Finished: FAILURE --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user rxin commented on the issue: https://github.com/apache/spark/pull/21071 @devaraj-kavali can you close this PR first? Looks like there isn't any reason to really use htrace anymore ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)** for PR 20937 at commit [`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user devaraj-kavali closed the pull request at: https://github.com/apache/spark/pull/21071 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21056 **[Test build #89692 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)** for PR 21056 at commit [`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89692/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183258353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) } +filteredDataset + } -val metadata = NominalAttribute.defaultAttr - .withName($(outputCol)).withValues(filteredLabels).toMetadata() -// If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = $(handleInvalid) match { - case StringIndexer.SKIP_INVALID => -val filterer = udf { label: String => - labelToIndex.contains(label) -} - (dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, false) - case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID) -} + private def getIndexer(labels: Seq[String], labelToIndex: OpenHashMap[String, Double]) = { +val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID) -val indexer = udf { label: String => +udf { label: String => --- End diff -- This requires calling many udf for different input columns. Should we combine then in one udf? The `filteredDataset` logic can be in as well to avoid multiple lookups. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253488 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { --- End diff -- If both `inputCol` and `inputCols` are set, throw an exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r18325 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) --- End diff -- Isn't this very expansive? We basically look up `labelToIndex` twice. Will be cool that we support `lit(Map())` so we can do those lookup natively in SQL, and also `na.drop` together in wholestage codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257799 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) --- End diff -- is it possible to not use `var filteredDataset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2569/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21082 **[Test build #89693 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)** for PR 21082 at commit [`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21125 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21125 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21111: [SPARK-23877][SQL][followup] use PhysicalOperatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/2#discussion_r183266741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -val partitionData = fsRelation.location.listFiles(relFilters, Nil) -// partition data may be a stream, which can cause serialization to hit stack level too -// deep exceptions because it is a recursive structure in memory. converting to array -// avoids the problem. --- End diff -- > Would it be reasonable for a future commit to remove the @transient modifier and re-introduce the problem? That's very unlikely. SPARK-21884 guarantees Spark won't serialize the rows and we have regression tests to protect us. BTW it would be a lot of work to make sure all the places that create `LocalRelation` do not use recursive structure. I'll add some comments to `LocalRelation` to emphasize it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21117: [followup][SPARK-10399][SPARK-23879][Core] Free unused o...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21117 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270654 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. + If None is set, the encoding of input JSON will be detected automatically + when the multiLine option is set to ``true``. --- End diff -- Does it mean users have to set the encoding if `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270773 --- Diff: python/pyspark/sql/readwriter.py --- @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``-MM-dd'T'HH:mm:ss.SSSXXX``. +:param encoding: specifies encoding (charset) of saved json files. If None is set, +the default UTF-8 charset will be used. --- End diff -- shall we mention that, if `encoding` is set, `lineSep` also need to be set when `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21124 **[Test build #89694 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89694/testReport)** for PR 21124 at commit [`304498e`](https://github.com/apache/spark/commit/304498eaf3ea0bd8a52a150257dc8b38a11c4108). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user tedyu commented on the issue: https://github.com/apache/spark/pull/21124 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21117: [followup][SPARK-10399][SPARK-23879][Core] Free u...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21117 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183268807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,38 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { +if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildBuffers() +} + } } +_cachedColumnBuffers } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { -buildBuffers() + def clearCache(blocking: Boolean = true): Unit = { +if (_cachedColumnBuffers != null) { + synchronized { +if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist(blocking) --- End diff -- shall we also do `_cachedColumnBuffers = null` so that `unpersist` won't be called twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269227 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- how does this test prove we don't trigger jobs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r183269033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { --- End diff -- Based on your JIRA comment, can we put `@DeveloperApi` or `@InterfaceStability.Unstable` in this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r183270323 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { import CatalogTypes.TablePartitionSpec + // Returns the underlying catalog class (e.g., HiveExternalCatalog). + def unwrapped: ExternalCatalog = this --- End diff -- Maybe we can move it to `ExternalCatalogWithListener` and mark `SharedState.externalCatalog` as `ExternalCatalogWithListener` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20980 **[Test build #89698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89698/testReport)** for PR 20980 at commit [`eaef6b3`](https://github.com/apache/spark/commit/eaef6b374f86835bb08b9abf6d09d28aec1da9a8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21125 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21125 Jenkins, please test this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14083#discussion_r183265525 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -138,6 +140,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + +private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) +} + +/** Map to use for direct case insensitive attribute lookups. */ +@transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase)) +} + +/** Map to use for qualified case insensitive attribute lookups. */ +@transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => +(a.qualifier.get.toLowerCase, a.name.toLowerCase) + } + unique(grouped) +} + +/** Perform attribute resolution given a name and a resolver. */ +def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { +candidates.toSeq.flatMap(_.collect { + case a if resolver(a.name, name) => a.withName(name) +}) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { +case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = collectMatches(name, qualified.get(key)).filter { a => +resolver(qualifier, a.qualifier.get) + } + (attributes, nestedFields) +case all => + (Nil, all) + } + + // If none of attributes match `table.column` pattern, we try to resolve it as a column. + val (candidates, nestedFields) = matches match { +case (Seq(), _) => + val name = nameParts.head + val attributes = collectMatches(name, direct.get(name.toLowerCase)) + (attributes, nameParts.tail) +case _ => matches + } + + def name = UnresolvedAttribute(nameParts).name + candidates match { +case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. + // The foldLeft adds ExtractValues for every remaining parts of the identifier, + // and aliased it with the last part of the name. + // For example, consider "a.b.c", where "a" is resolved to an existing attribute. + // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final + // expression as "c". + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => +ExtractValue(e, Literal(name), resolver) + } + Some(Alias(fieldExprs, nestedFields.last)()) + +case Seq(a) => + // One match, no nested fields, use it. + Some(a) + +case Seq() => + // No matches. + None + +case ambiguousReferences => + // More than one match. + val referenceNames = ambiguousReferences.mkString(", ") --- End diff -- to pass the test, we should follow the previous code: `ambiguousReferences.map(_._1.qualifiedName).mkString(", ")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14083: [SPARK-16406][SQL] Improve performance of LogicalPlan.re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/14083 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21100: [SPARK-24012][SQL] Union of map and other compatible col...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21100 **[Test build #89696 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89696/testReport)** for PR 21100 at commit [`19b5c6a`](https://github.com/apache/spark/commit/19b5c6a84b38b4ce275093f79eee0ff594e50f90). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...
Github user liutang123 commented on a diff in the pull request: https://github.com/apache/spark/pull/21100#discussion_r183269702 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -171,6 +171,15 @@ object TypeCoercion { .orElse((t1, t2) match { case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) +case (MapType(keyType1, valueType1, n1), MapType(keyType2, valueType2, n2)) --- End diff -- Hi, I implements this logic in `findTightestCommonType`, looking forward to further review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r183272597 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -736,12 +736,22 @@ object EliminateSorts extends Rule[LogicalPlan] { } /** - * Removes Sort operation if the child is already sorted + * Removes redundant Sort operation. This can happen: + * 1) if the child is already sorted + * 2) if there is another Sort operator separated by 0...n Project/Filter operators */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => child +case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) + } + + def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match { +case Project(fields, child) => Project(fields, recursiveRemoveSort(child)) +case Filter(condition, child) => Filter(condition, recursiveRemoveSort(child)) --- End diff -- we should at least add `ResolvedHint`. To easily expand the white list in the future, I'd like to change the code style to ``` def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match { case s: Sort => recursiveRemoveSort(s.child) case other if canEliminateSort(other) => other.withNewChildren(other.children.map(recursiveRemoveSort)) case _ => plan } def canEliminateSort(plan: LogicalPlan): Boolean = plan match { case p: Project => p.projectList.forall(_.deterministic) case f: Filter => f.condition.deterministic case _: ResolvedHint => true ... case _ => false } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. --- End diff -- We won't need to copy the javadoc for the parent class. Just say `A base interface for data source v2 implementations of the built-in file-based data sources.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. + */ +trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister { + /** + * Returns an optional V1 [[FileFormat]] class of the same file data source. + * This is a solution for the following cases: + * 1. File datasource V2 might be implemented partially during migration. + *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is not, + *write path should fall back to V1 implementation. + * 2. File datasource V2 implementations cause regression. + * 3. Catalog support is required, which is still under development for data source V2. + */ + def fallBackFileFormat: Option[Class[_]] = None --- End diff -- why it's optional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89694/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20980 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2571/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20980 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21125 **[Test build #89699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89699/testReport)** for PR 21125 at commit [`3c6a4da`](https://github.com/apache/spark/commit/3c6a4dab973851e385b6c9a2c77e5684ad6171a4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21083 **[Test build #89701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89701/testReport)** for PR 21083 at commit [`787cddf`](https://github.com/apache/spark/commit/787cddffeba0f21cd40312bcbf84d1bb75126044). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21124 @brkyvz PTAL. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/2 **[Test build #89695 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89695/testReport)** for PR 2 at commit [`d624955`](https://github.com/apache/spark/commit/d624955aa7fd07acde698a50d05ed5679ee91533). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- I feel it's more clear to create a listener and explicitly show we don't trigger any jobs after calling `Dataset.cache` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20937 LGTM except a few minor comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is [[InsertIntoTable]]. + */ +class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] { --- End diff -- Need a little more comments about when this can happen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -89,8 +91,13 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = -DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + lazy val providingClass: Class[_] = { +val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) +cls.newInstance() match { + case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls) --- End diff -- why do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org