[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19475 **[Test build #82659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82659/testReport)** for PR 19475 at commit [`ce55412`](https://github.com/apache/spark/commit/ce5541260dac65ce09df374d240910f12099b3af). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19475 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19475 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82655/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19475 **[Test build #82655 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82655/testReport)** for PR 19475 at commit [`f97fb98`](https://github.com/apache/spark/commit/f97fb9808fdeb2a9d46cd70105c7d05b876ad3fa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144198505 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala --- @@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration */ trait DataWritingCommand extends RunnableCommand { + def query: LogicalPlan --- End diff -- Add one line description for `query`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19464 **[Test build #82658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82658/testReport)** for PR 19464 at commit [`cf0c350`](https://github.com/apache/spark/commit/cf0c350daf12ce80fc781fd17fd15506d83c6d02). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144198270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -117,7 +117,7 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) -val allColumns = plan.output +val allColumns = queryExecution.logical.output --- End diff -- Explicitly using `analyzed`'s schema is better here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19464 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144197934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala --- @@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration */ trait DataWritingCommand extends RunnableCommand { + def query: LogicalPlan + + // We make the input `query` an inner child instead of a child in order to hide it from the + // optimizer. This is because optimizer may change the output schema names, and we have to keep --- End diff -- You will scare others. :) -> `may not preserve the output schema names' case` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19464 I think the optimisation by `spark.sql.files.maxPartitionBytes` sql specific conf includes this concept in `FileScanRDD` and it looks already partially doing it in combining input splits. I'd suggest to avoid putting this conf in `FileScanRDD`, for now, if I didn't miss something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19477 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19477 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82654/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19389: [SPARK-22165][SQL] Resolve type conflicts between decima...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19389 Thank you so much @gatorsmile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19477 **[Test build #82654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82654/testReport)** for PR 19477 at commit [`b545f28`](https://github.com/apache/spark/commit/b545f281b19120cc2c9e4197cae4b1315969247d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144197368 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala --- @@ -30,4 +31,12 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { assert(partFiles.length === 2) } } + + test("FileFormatWriter should respect the input query schema") { +withTable("t1", "t2") { + spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") --- End diff -- Also add another case here? ``` spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19464#discussion_r144197159 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -196,7 +196,10 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) -val inputSplits = inputFormat.getSplits(jobConf, minPartitions) +var inputSplits = inputFormat.getSplits(jobConf, minPartitions) +if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { --- End diff -- I'd use `spark.files` prefix, taken after `spark.files.ignoreCorruptFiles`, `spark.files.maxPartitionBytes` and `spark.files.openCostInBytes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19389: [SPARK-22165][SQL] Resolve type conflicts between decima...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19389 Will review it this weekend. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r143906469 --- Diff: python/pyspark/sql/tests.py --- @@ -3095,16 +3095,32 @@ def setUpClass(cls): StructField("3_long_t", LongType(), True), StructField("4_float_t", FloatType(), True), StructField("5_double_t", DoubleType(), True)]) -cls.data = [("a", 1, 10, 0.2, 2.0), -("b", 2, 20, 0.4, 4.0), -("c", 3, 30, 0.8, 6.0)] +cls.data = [(u"a", 1, 10, 0.2, 2.0), +(u"b", 2, 20, 0.4, 4.0), +(u"c", 3, 30, 0.8, 6.0)] + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() def assertFramesEqual(self, df_with_arrow, df_without): msg = ("DataFrame from Arrow is not equal" + ("\n\nWith Arrow:\n%s\n%s" % (df_with_arrow, df_with_arrow.dtypes)) + ("\n\nWithout:\n%s\n%s" % (df_without, df_without.dtypes))) self.assertTrue(df_without.equals(df_with_arrow), msg=msg) +def createPandasDataFrameFromeData(self): --- End diff -- nit: typo `createPandasDataFrameFromeData` -> `createPandasDataFrameFromData` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r144194374 --- Diff: python/pyspark/sql/session.py --- @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] +if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ +and len(data) > 0: +from pyspark.serializers import ArrowSerializer --- End diff -- Maybe we should split this block to a method like `_createFromPandasDataFrame` as the same as the other create methods? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r144194084 --- Diff: python/pyspark/sql/session.py --- @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] +if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ +and len(data) > 0: +from pyspark.serializers import ArrowSerializer +from pyspark.sql.types import from_arrow_schema +import pyarrow as pa + +# Slice the DataFrame into batches +split = -(-len(data) // self.sparkContext.defaultParallelism) # round int up +slices = (data[i:i + split] for i in xrange(0, len(data), split)) +batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False) + for sliced_df in slices] + +# write batches to temp file, read by JVM (borrowed from context.parallelize) +import os +from tempfile import NamedTemporaryFile +tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) +try: +serializer = ArrowSerializer() +serializer.dump_stream(batches, tempFile) +tempFile.close() +readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile +jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches)) +finally: +# readRDDFromFile eagerily reads the file so we can delete right after. +os.unlink(tempFile.name) + +# Create the Spark DataFrame, there will be at least 1 batch +schema = from_arrow_schema(batches[0].schema) --- End diff -- What if a user specify the schema? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19475 The idea sounds good to me. Please add unit test cases for all the ExpressionSet APIs. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19429: [SPARK-20055] [Docs] Added documentation for load...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19429 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA
Github user mpjlu commented on the issue: https://github.com/apache/spark/pull/19337 For the comments about change the name of epsilon and add setter in localLADModel, we have agreed not to change it now after some offline discussion. Because epsilon doesn't control model convergence directly, and some other LDA implementations like Vowpal Vabbit also uses this name. Because there are many parameters in LDA, epsilon is just one of them, now there is no setter for any of them. If we need to add setter of them, we maybe add them together in another PR. Thanks. @hhbyyh @jkbradley --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19429: [SPARK-20055] [Docs] Added documentation for loading csv...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19429 @jomach is a new contributor to Apache Spark. It might be hard for him to address the above comments. Please submit a separate PR for addressing it. Will review it. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19429: [SPARK-20055] [Docs] Added documentation for loading csv...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19429 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144195079 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- In Spark SQL, we do issue the `AnalysisException` in many similar cases. I am also fine to use `SparkException`. In this specific case, the users are able to control the conf to make it works. Thus, we also need to improve the message to let users know how to resolve it by changing the conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19337 **[Test build #82657 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82657/testReport)** for PR 19337 at commit [`b329051`](https://github.com/apache/spark/commit/b32905138d598b084903c3a5fabc2bf02e47b37a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19439: [SPARK-21866][ML][PySpark] Adding spark image reader
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19439 **[Test build #82656 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82656/testReport)** for PR 19439 at commit [`d42636a`](https://github.com/apache/spark/commit/d42636a99dd045fa1003b4907b35796829c6efd5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18979 Could you also include the [test cases](https://github.com/dongjoon-hyun/spark/blob/b545f281b19120cc2c9e4197cae4b1315969247d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala#L2054-L2060) to [InsertSuite.scala](https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala) ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144194076 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { + val
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144193549 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144193350 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { --- End diff -- I believe getOrigin/getHeight/getWidth/getNChannels/getMode/getData are all convenient functions that users should be able to use - but I can change them to private if you prefer. I've added better documentation to the methods. isImage is also a convenient function for them to use to tell if the column is an image column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144193840 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144193584 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144193367 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144191899 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144191439 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import scala.language.existentials +import scala.util.Random + +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.conf.{Configuration, Configured} +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.sql.SparkSession + +private object RecursiveFlag { + + /** + * Sets a value of spark recursive flag + * + * @param value value to set + * @param spark existing spark session + * @return previous value of this flag + */ + def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = { +val flagName = FileInputFormat.INPUT_DIR_RECURSIVE +val hadoopConf = spark.sparkContext.hadoopConfiguration +val old = Option(hadoopConf.get(flagName)) + +value match { + case Some(v) => hadoopConf.set(flagName, v) + case None => hadoopConf.unset(flagName) +} + +old + } +} + +/** + * Filter that allows loading a fraction of HDFS files. + */ +private class SamplePathFilter extends Configured with PathFilter { + val random = { +val rd = new Random() +rd.setSeed(0) +rd + } + + // Ratio of files to be read from disk + var sampleRatio: Double = 1 + + override def setConf(conf: Configuration): Unit = { +if (conf != null) { + sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1) +} + } + + override def accept(path: Path): Boolean = { +// Note: checking fileSystem.isDirectory is very slow here, so we use basic rules instead +!SamplePathFilter.isFile(path) || + random.nextDouble() < sampleRatio + } +} + +private object SamplePathFilter { + val ratioParam = "sampleRatio" + + def isFile(path: Path): Boolean = FilenameUtils.getExtension(path.toString) != "" + + /** + * Sets hdfs PathFilter + * + * @param value Filter class that is passed to HDFS + * @param sampleRatio Fraction of the files that the filter picks + * @param spark Existing Spark session + * @returnReturns the previous hdfs path filter + */ + def setPathFilter(value: Option[Class[_]], sampleRatio: Double, +spark: SparkSession) : Option[Class[_]] = { +val flagName = FileInputFormat.PATHFILTER_CLASS +val hadoopConf = spark.sparkContext.hadoopConfiguration +val old = Option(hadoopConf.getClass(flagName, null)) +hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio) + +value match { + case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter]) + case None => hadoopConf.unset(flagName) +} +old + } + + /** + * Unsets hdfs PathFilter --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144191203 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144191154 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3 in most cases + StructField("mode", StringType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + // Dataframe with a single column of images named "image" (nullable) + private val imageDFSchema = StructType(StructField("image", columnSchema, true) :: Nil) + + @Since("2.3.0") + def getOrigin(row: Row): String = row.getString(0) + + @Since("2.3.0") + def getHeight(row: Row): Int = row.getInt(1) + + @Since("2.3.0") + def getWidth(row: Row): Int = row.getInt(2) + + @Since("2.3.0") + def getNChannels(row: Row): Int = row.getInt(3) + + @Since("2.3.0") + def getMode(row: Row): String = row.getString(4) + + @Since("2.3.0") + def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5) + + /** + * Check if the dataframe column contains images (i.e. has ImageSchema) + * + * @param df Dataframe + * @param column Column name + * @return True if the given column matches the image schema + */ + @Since("2.3.0") + def isImageColumn(df: DataFrame, column: String): Boolean = +df.schema(column).dataType == columnSchema + + /** + * Default values for the invalid image + * + * @param origin Origin of the invalid image + * @return Row with the default values + */ + private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, -1, -1, undefinedImageType, +Array.ofDim[Byte](0))) + + /** + * Convert the compressed image (jpeg, png, etc.) into OpenCV + * representation and store it in dataframe Row + * + * @param origin Arbitrary string that identifies the image + * @param bytesImage bytes (for example, jpeg) + * @return Dataframe Row or None (if the decompression fails) + */ + private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { + +val img = ImageIO.read(new ByteArrayInputStream(bytes)) + +if (img == null) { + None +} else { +
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144191057 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import scala.language.existentials +import scala.util.Random + +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.conf.{Configuration, Configured} +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.sql.SparkSession + +private object RecursiveFlag { + + /** + * Sets a value of spark recursive flag + * + * @param value value to set + * @param spark existing spark session + * @return previous value of this flag + */ + def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = { +val flagName = FileInputFormat.INPUT_DIR_RECURSIVE +val hadoopConf = spark.sparkContext.hadoopConfiguration +val old = Option(hadoopConf.get(flagName)) + +value match { + case Some(v) => hadoopConf.set(flagName, v) + case None => hadoopConf.unset(flagName) +} + +old + } +} + +/** + * Filter that allows loading a fraction of HDFS files. + */ +private class SamplePathFilter extends Configured with PathFilter { + val random = { +val rd = new Random() +rd.setSeed(0) --- End diff -- removed seed - it might be good to expose this as a parameter in the future so that readImages loads the same images (and not just the same ratio) on every invocation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144190588 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import scala.language.existentials +import scala.util.Random + +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.conf.{Configuration, Configured} +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.sql.SparkSession + +private object RecursiveFlag { + + /** + * Sets a value of spark recursive flag + * + * @param value value to set --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144190450 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) +height = array.shape[0] +width = array.shape[1] +nChannels = array.shape[2] +# Creating new Row with _create_row(), because Row(name = value, ... ) +# orders fields by name, which conflicts with expected ImageSchema order +# when the new DataFrame is created by UDF +return _create_row(ImageFields, + [origin, height, width, nChannels, mode, data]) + + +def readImages(path, + recursive=False, + numPartitions=0, + dropImageFailures=False, + sampleRatio=1.0): +""" +Reads the directory of images from the local or remote (WASB) source. +Args: --- End diff -- good catch - removed spark session --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144190368 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) +height = array.shape[0] +width = array.shape[1] +nChannels = array.shape[2] +# Creating new Row with _create_row(), because Row(name = value, ... ) --- End diff -- @holdenk is @MrBago 's resolution reasonable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144190224 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144189855 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user imatiach-msft commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r144189582 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.image + +import java.awt.Color +import java.awt.color.ColorSpace +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +@Experimental +@Since("2.3.0") +object ImageSchema { + + val undefinedImageType = "Undefined" + + val ocvTypes = Map( +undefinedImageType -> -1, +"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, "CV_8UC4" -> 24, +"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, "CV_8SC4" -> 25, +"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, "CV_16UC4" -> 26, +"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, "CV_16SC4" -> 27, +"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, "CV_32SC4" -> 28, +"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, "CV_32FC4" -> 29, +"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, "CV_64FC4" -> 30 + ) + + /** + * Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) + */ + val columnSchema = StructType( +StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: --- End diff -- It is strange, it seems my intellij prefers this default but I can't tell why. I've changed it to two spaces for all of them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19472 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82651/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19472 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19472 **[Test build #82651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82651/testReport)** for PR 19472 at commit [`a814eb3`](https://github.com/apache/spark/commit/a814eb3f08085b09a16f336b36fba8da24e4f34a). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19477#discussion_r144188789 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -2050,4 +2050,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet", "csv", "json", "text").foreach { format => --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/18979 +1. This solves the regression on writing emtpy dataset with ORC format, too! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19477 Wow. There is a PR for that. Thank you for informing that, @viirya ! Then, it's good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...
Github user dongjoon-hyun closed the pull request at: https://github.com/apache/spark/pull/19477 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19458: [SPARK-22227][CORE] DiskBlockManager.getAllBlocks now to...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19458 Instead of filtering out temp blocks, why not adding parsing rule for `TempLocalBlockId` and `TempShuffleBlockId`? That could also solve the problem. Since `DiskBlockManager#getAllFiles` doesn't filter out temp shuffle/local files, is it better to keep the same behavior for `DiskBlockManager#getAllBlocks`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187454 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" + } + + private def getTableCompressPropName(format: String): String = { +format.toLowerCase match { + case "parquet" => "parquet.compression" + case "orc" => "orc.compress" +} + } + + private def getTableCompressionCodec(path: String, format: String): String = { +val hadoopConf = spark.sessionState.newHadoopConf() +val codecs = format match { + case "parquet" => for { +footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) +block <- footer.getParquetMetadata.getBlocks.asScala +column <- block.getColumns.asScala + } yield column.getCodec.name() + case "orc" => new File(path).listFiles().filter{ file => +file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" + }.map { orcFile => + OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString + }.toSeq +} + +assert(codecs.distinct.length == 1) +codecs.head + } + + private def writeDataToTable( + rootDir: File, + tableName: String, + isPartitioned: Boolean, + format: String, + compressionCodec: Option[String]) { +val tblProperties = compressionCodec match { + case Some(prop) => s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')" + case _ => "" +} +val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else "" +sql( + s""" + |CREATE TABLE $tableName(a int) + |$partitionCreate + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |$tblProperties + """.stripMargin) + +val partitionInsert = if (isPartitioned) s"partition (p=1)" else "" +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |$partitionInsert + |SELECT * from table_source --- End diff -- nit. `from` -> `FROM` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187309 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" + } + + private def getTableCompressPropName(format: String): String = { +format.toLowerCase match { + case "parquet" => "parquet.compression" + case "orc" => "orc.compress" +} + } + + private def getTableCompressionCodec(path: String, format: String): String = { --- End diff -- The logic means a compression codec from the files. The prefix `getTable` looks misleading to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187101 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" --- End diff -- Here, too. - `SQLConf.PARQUET_COMPRESSION.key` instead of "spark.sql.parquet.compression.codec" - `SQLConf.ORC_COMPRESSION.key` insead of "spark.sql.orc.compression.codec" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144186944 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" --- End diff -- Could you use keys? - `HiveUtils.CONVERT_METASTORE_PARQUET.key` instead of "spark.sql.hive.convertMetastoreParquet" - `HiveUtils.CONVERT_METASTORE_ORC.key` instead of "spark.sql.hive.convertMetastoreOrc" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19477#discussion_r144186840 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -2050,4 +2050,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet", "csv", "json", "text").foreach { format => --- End diff -- Maybe this test case is worth merging into. cc @steveloughran Shall we include this test into #18979? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144186680 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- I am trying to be consistent with the behavior of original implementation. I think I had better override `--` for efficiency --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19477 @dongjoon-hyun This is kind of duplicate to #18979, although the viewpoint of the issue is different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19419#discussion_r144186220 --- Diff: conf/spark-defaults.conf.template --- @@ -25,3 +25,10 @@ # spark.serializer org.apache.spark.serializer.KryoSerializer # spark.driver.memory 5g # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" + +# spark.ui.allowFramingFrom https://www.example.com/ +# spark.ui.xXssProtection 1; mode=block +# spark.ui.xContentType.options nosniff + +# Enable below only when Spark is running on HTTPS +# spark.ui.strictTransportSecurity max-age=31536000 --- End diff -- What's the meaning of this specific number "31536000"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19419 @vanzin @tgravescs @ajbozarth what is your opinion on this PR? Is it a necessary fix for Spark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144185928 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- If so, why you clone here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user fjh100456 commented on the issue: https://github.com/apache/spark/pull/19218 cc @gatorsmile @dongjoon-hyun Is it ok now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19475 **[Test build #82655 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82655/testReport)** for PR 19475 at commit [`f97fb98`](https://github.com/apache/spark/commit/f97fb9808fdeb2a9d46cd70105c7d05b876ad3fa). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...
Github user gengliangwang commented on the issue: https://github.com/apache/spark/pull/19475 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144184258 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- There is no need to drop, since the non-deterministic `elem` is not in `originals`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19477 Hi, @gatorsmile and @cloud-fan . This is a regression of SPARK-21669 (Internal API for collecting metrics/stats during FileFormatWriter jobs) at Spark 2.3.0. Could you review this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144184155 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- Sorry, for `Set` I mean the whole `ExpressionSet`. Please check the logic in `CombineFilters` and my new test case, you will understand me immediately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144183735 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- `Set`? It seems `originals` is `ArrayBuffer`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19477 **[Test build #82654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82654/testReport)** for PR 19477 at commit [`b545f28`](https://github.com/apache/spark/commit/b545f281b19120cc2c9e4197cae4b1315969247d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144183653 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -46,14 +47,20 @@ object ExpressionSet { * set.contains(1 + a) => true * set.contains(a + 2) => false * }}} + * + * For non-deterministic expressions, they are always considered as not contained in the [[Set]]. + * On adding a non-deterministic expression, simply append it to the original expressions. + * This is consistent with how we define `semanticEquals` between two expressions. */ class ExpressionSet protected( protected val baseSet: mutable.Set[Expression] = new mutable.HashSet, protected val originals: mutable.Buffer[Expression] = new ArrayBuffer) extends Set[Expression] { protected def add(e: Expression): Unit = { -if (!baseSet.contains(e.canonicalized)) { +if (!e.deterministic) { + originals += e +} else if (!baseSet.contains(e.canonicalized) ) { --- End diff -- SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/19477 [SPARK-22258][SQL] Writing empty dataset fails with ORC format ## What changes were proposed in this pull request? Since [SPARK-8501](https://issues.apache.org/jira/browse/SPARK-8501), Spark doesn't create an ORC file for empty data sets. However, [SPARK-21669](https://issues.apache.org/jira/browse/SPARK-21669) is trying to get the length of the written file at the end of writing tasks and fails with `FileNotFoundException`. This is a regression at 2.3.0 only. We had better fix this and have a test case to prevent future regression. ```scala scala> Seq("str").toDS.limit(0).write.format("orc").save("/tmp/a") 17/10/11 19:28:59 ERROR Utils: Aborting task java.io.FileNotFoundException: File file:/tmp/a/_temporary/0/_temporary/attempt_20171011192859__m_00_0/part-0-aa56c3cf-ec35-48f1-bb73-23ad1480e917-c000.snappy.orc does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) at org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker.getFileSize(BasicWriteStatsTracker.scala:60) ``` ## How was this patch tested? Pass the newly added test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-22258 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19477.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19477 commit b545f281b19120cc2c9e4197cae4b1315969247d Author: Dongjoon HyunDate: 2017-10-12T02:38:51Z [SPARK-22258][SQL] Writing empty dataset fails with ORC format --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144182958 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -46,14 +47,20 @@ object ExpressionSet { * set.contains(1 + a) => true * set.contains(a + 2) => false * }}} + * + * For non-deterministic expressions, they are always considered as not contained in the [[Set]]. + * On adding a non-deterministic expression, simply append it to the original expressions. + * This is consistent with how we define `semanticEquals` between two expressions. */ class ExpressionSet protected( protected val baseSet: mutable.Set[Expression] = new mutable.HashSet, protected val originals: mutable.Buffer[Expression] = new ArrayBuffer) extends Set[Expression] { protected def add(e: Expression): Unit = { -if (!baseSet.contains(e.canonicalized)) { +if (!e.deterministic) { + originals += e +} else if (!baseSet.contains(e.canonicalized) ) { --- End diff -- Yeah I have thought about this. But the time complexity is O(n) for adding an expression. It is a trade off, I prefer to my current implementation, the time complexity is O(1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19318: [WIP][SPARK-22096][ML] use aggregateByKeyLocally in feat...
Github user VinceShieh commented on the issue: https://github.com/apache/spark/pull/19318 thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19433 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82652/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r144182701 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes --- End diff -- We should also change the sbt file to make it work using sbt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19433 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19433 **[Test build #82652 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82652/testReport)** for PR 19433 at commit [`5c29d3d`](https://github.com/apache/spark/commit/5c29d3d1e899c8d311633c4d763b57e42a26c660). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19475#discussion_r144182675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { -val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) -val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) -new ExpressionSet(newBaseSet, newOriginals) +if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) +} else { + new ExpressionSet(baseSet.clone(), originals.clone()) --- End diff -- No, it is not dropping it. Any non-deterministic `elem` is not considered as contained in the Set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19464 IIUC this issue also existed in `NewHadoopRDD` and `FileScanRDD` (possibly), we'd better also fix them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19464#discussion_r144181321 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -196,7 +196,10 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) -val inputSplits = inputFormat.getSplits(jobConf, minPartitions) +var inputSplits = inputFormat.getSplits(jobConf, minPartitions) +if (sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) { --- End diff -- I would suggest not to use the name started by "spark.hadoop", this kind of configurations will be treated as Hadoop configuration and set into Hadoop `Configuration`, it might be better to choose another name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19474 Minor comments. LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144180788 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -117,7 +117,7 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) -val allColumns = plan.output +val allColumns = queryExecution.logical.output --- End diff -- Btw, shall we use `queryExecution.analyzed.output`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19474#discussion_r144180666 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -117,7 +117,7 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) -val allColumns = plan.output +val allColumns = queryExecution.logical.output --- End diff -- I think it'd be good to leave a comment that we should not use optimized output here in case it will be changed in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19476 **[Test build #82653 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82653/testReport)** for PR 19476 at commit [`f50a7b7`](https://github.com/apache/spark/commit/f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19474 I like this change because the relation between `ExecutedCommandExec` and `RunnableCommand` is a little entangled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...
GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/19476 [SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM ## What changes were proposed in this pull request? In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor. So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file. To achieve this feature, what I did is: 1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle. 2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all. ## How was this patch tested? This was tested by adding UT, also manual verification in local test to perform GC to clean the files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-22062 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19476 commit f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b Author: jerryshaoDate: 2017-10-12T01:47:35Z Spill large blocks to disk during remote fetches in BlockManager --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19474 > The scan node is no longer visible above the insert node, I'll fix this later. The writer bug is more important and we should fix it ASAP. Totally agreed. LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18664 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82650/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18664 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18664 **[Test build #82650 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82650/testReport)** for PR 18664 at commit [`efe3e27`](https://github.com/apache/spark/commit/efe3e27a1f374e4482cffe2ce3877aceffc5eaad). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18460 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82649/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18460 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18460 **[Test build #82649 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82649/testReport)** for PR 18460 at commit [`52d19d3`](https://github.com/apache/spark/commit/52d19d36bb7f704ca79aa398add03393860d69c2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144154295 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN } protected override def doExecute(): RDD[InternalRow] = { -child.execute().coalesce(numPartitions, shuffle = false) +if (numPartitions == 1 && child.execute().getNumPartitions < 1) { --- End diff -- Add a test in DatasetSuite that tests this empty rdd case. maybe in the same test as the existing coalesce test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144152923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -131,17 +132,17 @@ class IncrementalExecution( } override def preparations: Seq[Rule[SparkPlan]] = -Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations +Seq(state, EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ super.preparations /** No need assert supported, as this check has already been done */ override def assertSupported(): Unit = { } } -object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { +case class EnsureStatefulOpPartitioning(conf: SQLConf) extends Rule[SparkPlan] { // Needs to be transformUp to avoid extra shuffles override def apply(plan: SparkPlan): SparkPlan = plan transformUp { case so: StatefulOperator => - val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions + val numPartitions = conf.numShufflePartitions --- End diff -- Why this change? Doesnt the plan have the same context and conf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19433 **[Test build #82652 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82652/testReport)** for PR 19433 at commit [`5c29d3d`](https://github.com/apache/spark/commit/5c29d3d1e899c8d311633c4d763b57e42a26c660). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19472 **[Test build #82651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82651/testReport)** for PR 19472 at commit [`a814eb3`](https://github.com/apache/spark/commit/a814eb3f08085b09a16f336b36fba8da24e4f34a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org