[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19475
  
**[Test build #82659 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82659/testReport)**
 for PR 19475 at commit 
[`ce55412`](https://github.com/apache/spark/commit/ce5541260dac65ce09df374d240910f12099b3af).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19475
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19475
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82655/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19475
  
**[Test build #82655 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82655/testReport)**
 for PR 19475 at commit 
[`f97fb98`](https://github.com/apache/spark/commit/f97fb9808fdeb2a9d46cd70105c7d05b876ad3fa).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144198505
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 ---
@@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration
  */
 trait DataWritingCommand extends RunnableCommand {
 
+  def query: LogicalPlan
--- End diff --

Add one line description for `query`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19464
  
**[Test build #82658 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82658/testReport)**
 for PR 19464 at commit 
[`cf0c350`](https://github.com/apache/spark/commit/cf0c350daf12ce80fc781fd17fd15506d83c6d02).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144198270
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -117,7 +117,7 @@ object FileFormatWriter extends Logging {
 job.setOutputValueClass(classOf[InternalRow])
 FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-val allColumns = plan.output
+val allColumns = queryExecution.logical.output
--- End diff --

Explicitly using `analyzed`'s schema is better here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...

2017-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19464
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144197934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 ---
@@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration
  */
 trait DataWritingCommand extends RunnableCommand {
 
+  def query: LogicalPlan
+
+  // We make the input `query` an inner child instead of a child in order 
to hide it from the
+  // optimizer. This is because optimizer may change the output schema 
names, and we have to keep
--- End diff --

You will scare others. :) 

-> `may not preserve the output schema names' case`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...

2017-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19464
  
I think the optimisation by `spark.sql.files.maxPartitionBytes` sql 
specific conf includes this concept in `FileScanRDD` and it looks already 
partially doing it in combining input splits. I'd suggest to avoid putting this 
conf in `FileScanRDD`, for now, if I didn't miss something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19477
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19477
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82654/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19389: [SPARK-22165][SQL] Resolve type conflicts between decima...

2017-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19389
  
Thank you so much @gatorsmile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19477
  
**[Test build #82654 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82654/testReport)**
 for PR 19477 at commit 
[`b545f28`](https://github.com/apache/spark/commit/b545f281b19120cc2c9e4197cae4b1315969247d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144197368
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
 ---
@@ -30,4 +31,12 @@ class FileFormatWriterSuite extends QueryTest with 
SharedSQLContext {
   assert(partFiles.length === 2)
 }
   }
+
+  test("FileFormatWriter should respect the input query schema") {
+withTable("t1", "t2") {
+  spark.range(1).select('id as 'col1, 'id as 
'col2).write.saveAsTable("t1")
--- End diff --

Also add another case here?
```
spark.range(1).select('id, 'id as 'col1, 'id as 
'col2).write.saveAsTable("t3")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144197159
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
--- End diff --

I'd use `spark.files` prefix, taken after `spark.files.ignoreCorruptFiles`, 
`spark.files.maxPartitionBytes` and `spark.files.openCostInBytes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19389: [SPARK-22165][SQL] Resolve type conflicts between decima...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19389
  
Will review it this weekend.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r143906469
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3095,16 +3095,32 @@ def setUpClass(cls):
 StructField("3_long_t", LongType(), True),
 StructField("4_float_t", FloatType(), True),
 StructField("5_double_t", DoubleType(), True)])
-cls.data = [("a", 1, 10, 0.2, 2.0),
-("b", 2, 20, 0.4, 4.0),
-("c", 3, 30, 0.8, 6.0)]
+cls.data = [(u"a", 1, 10, 0.2, 2.0),
+(u"b", 2, 20, 0.4, 4.0),
+(u"c", 3, 30, 0.8, 6.0)]
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
 
 def assertFramesEqual(self, df_with_arrow, df_without):
 msg = ("DataFrame from Arrow is not equal" +
("\n\nWith Arrow:\n%s\n%s" % (df_with_arrow, 
df_with_arrow.dtypes)) +
("\n\nWithout:\n%s\n%s" % (df_without, df_without.dtypes)))
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
+def createPandasDataFrameFromeData(self):
--- End diff --

nit: typo `createPandasDataFrameFromeData` -> 
`createPandasDataFrameFromData`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r144194374
  
--- Diff: python/pyspark/sql/session.py ---
@@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
+if self.conf.get("spark.sql.execution.arrow.enabled", 
"false").lower() == "true" \
+and len(data) > 0:
+from pyspark.serializers import ArrowSerializer
--- End diff --

Maybe we should split this block to a method like 
`_createFromPandasDataFrame` as the same as the other create methods?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r144194084
  
--- Diff: python/pyspark/sql/session.py ---
@@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
+if self.conf.get("spark.sql.execution.arrow.enabled", 
"false").lower() == "true" \
+and len(data) > 0:
+from pyspark.serializers import ArrowSerializer
+from pyspark.sql.types import from_arrow_schema
+import pyarrow as pa
+
+# Slice the DataFrame into batches
+split = -(-len(data) // 
self.sparkContext.defaultParallelism)  # round int up
+slices = (data[i:i + split] for i in xrange(0, len(data), 
split))
+batches = [pa.RecordBatch.from_pandas(sliced_df, 
preserve_index=False)
+   for sliced_df in slices]
+
+# write batches to temp file, read by JVM (borrowed from 
context.parallelize)
+import os
+from tempfile import NamedTemporaryFile
+tempFile = NamedTemporaryFile(delete=False, 
dir=self._sc._temp_dir)
+try:
+serializer = ArrowSerializer()
+serializer.dump_stream(batches, tempFile)
+tempFile.close()
+readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
+jrdd = readRDDFromFile(self._jsc, tempFile.name, 
len(batches))
+finally:
+# readRDDFromFile eagerily reads the file so we can 
delete right after.
+os.unlink(tempFile.name)
+
+# Create the Spark DataFrame, there will be at least 1 
batch
+schema = from_arrow_schema(batches[0].schema)
--- End diff --

What if a user specify the schema?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19475
  
The idea sounds good to me. Please add unit test cases for all the 
ExpressionSet APIs. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19429: [SPARK-20055] [Docs] Added documentation for load...

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19429


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-11 Thread mpjlu
Github user mpjlu commented on the issue:

https://github.com/apache/spark/pull/19337
  
For the comments about change the name of epsilon and add setter in 
localLADModel, we have agreed not to change it now after some offline 
discussion. 
Because epsilon doesn't control model convergence directly, and some other 
LDA implementations like Vowpal Vabbit also uses this name. 
Because there are many parameters in LDA, epsilon is just one of them, now 
there is no setter for any of them. If we need to add setter of them, we maybe 
add them together in another PR. 
Thanks. @hhbyyh @jkbradley 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19429: [SPARK-20055] [Docs] Added documentation for loading csv...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19429
  
@jomach is a new contributor to Apache Spark. It might be hard for him to 
address the above comments. Please submit a separate PR for addressing it. Will 
review it. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19429: [SPARK-20055] [Docs] Added documentation for loading csv...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19429
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144195079
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

In Spark SQL, we do issue the `AnalysisException` in many similar cases. I 
am also fine to use `SparkException`. 

In this specific case, the users are able to control the conf to make it 
works. Thus, we also need to improve the message to let users know how to 
resolve it by changing the conf. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19337
  
**[Test build #82657 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82657/testReport)**
 for PR 19337 at commit 
[`b329051`](https://github.com/apache/spark/commit/b32905138d598b084903c3a5fabc2bf02e47b37a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19439: [SPARK-21866][ML][PySpark] Adding spark image reader

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19439
  
**[Test build #82656 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82656/testReport)**
 for PR 19439 at commit 
[`d42636a`](https://github.com/apache/spark/commit/d42636a99dd045fa1003b4907b35796829c6efd5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18979
  
Could you also include the [test 
cases](https://github.com/dongjoon-hyun/spark/blob/b545f281b19120cc2c9e4197cae4b1315969247d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala#L2054-L2060)
 to 
[InsertSuite.scala](https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala)
 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144194076
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val 

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144193549
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144193350
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
--- End diff --

I believe getOrigin/getHeight/getWidth/getNChannels/getMode/getData are all 
convenient functions that users should be able to use - but I can change them 
to private if you prefer.  I've added better documentation to the methods.  
isImage is also a convenient function for them to use to tell if the column is 
an image column.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144193840
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144193584
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144193367
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144191899
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144191439
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /**
+   * Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/**
+ * Filter that allows loading a fraction of HDFS files.
+ */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /**
+   * Sets hdfs PathFilter
+   *
+   * @param value   Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark   Existing Spark session
+   * @returnReturns the previous hdfs path filter
+   */
+  def setPathFilter(value: Option[Class[_]], sampleRatio: Double,
+spark: SparkSession) : Option[Class[_]] = {
+val flagName = FileInputFormat.PATHFILTER_CLASS
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.getClass(flagName, null))
+hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
+
+value match {
+  case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
+  case None => hadoopConf.unset(flagName)
+}
+old
+  }
+
+  /**
+   * Unsets hdfs PathFilter
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144191203
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144191154
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /**
+   * Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /**
+   * Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144191057
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /**
+   * Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/**
+ * Filter that allows loading a fraction of HDFS files.
+ */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
--- End diff --

removed seed - it might be good to expose this as a parameter in the future 
so that readImages loads the same images (and not just the same ratio) on every 
invocation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144190588
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /**
+   * Sets a value of spark recursive flag
+   *
+   * @param value value to set
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144190450
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
+height = array.shape[0]
+width = array.shape[1]
+nChannels = array.shape[2]
+# Creating new Row with _create_row(), because Row(name = value, ... )
+# orders fields by name, which conflicts with expected ImageSchema 
order
+# when the new DataFrame is created by UDF
+return _create_row(ImageFields,
+   [origin, height, width, nChannels, mode, data])
+
+
+def readImages(path,
+   recursive=False,
+   numPartitions=0,
+   dropImageFailures=False,
+   sampleRatio=1.0):
+"""
+Reads the directory of images from the local or remote (WASB) source.
+Args:
--- End diff --

good catch - removed spark session


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144190368
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
+height = array.shape[0]
+width = array.shape[1]
+nChannels = array.shape[2]
+# Creating new Row with _create_row(), because Row(name = value, ... )
--- End diff --

@holdenk is @MrBago 's resolution reasonable?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144190224
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144189855
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-11 Thread imatiach-msft
Github user imatiach-msft commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r144189582
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Experimental
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /**
+   * Schema for the image column: Row(String, Int, Int, Int, Array[Byte])
+   */
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
--- End diff --

It is strange, it seems my intellij prefers this default but I can't tell 
why.  I've changed it to two spaces for all of them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19472
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82651/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19472
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19472
  
**[Test build #82651 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82651/testReport)**
 for PR 19472 at commit 
[`a814eb3`](https://github.com/apache/spark/commit/a814eb3f08085b09a16f336b36fba8da24e4f34a).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19477#discussion_r144188789
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -2050,4 +2050,12 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   }
 }
   }
+
+  Seq("orc", "parquet", "csv", "json", "text").foreach { format =>
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/18979
  
+1. This solves the regression on writing emtpy dataset with ORC format, 
too!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19477
  
Wow. There is a PR for that. Thank you for informing that, @viirya ! Then, 
it's good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun closed the pull request at:

https://github.com/apache/spark/pull/19477


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19458: [SPARK-22227][CORE] DiskBlockManager.getAllBlocks now to...

2017-10-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19458
  
Instead of filtering out temp blocks, why not adding parsing rule for 
`TempLocalBlockId` and `TempShuffleBlockId`? That could also solve the problem. 
Since `DiskBlockManager#getAllFiles` doesn't filter out temp shuffle/local 
files, is it better to keep the same behavior for 
`DiskBlockManager#getAllBlocks`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187454
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
+  }
+
+  private def getTableCompressPropName(format: String): String = {
+format.toLowerCase match {
+  case "parquet" => "parquet.compression"
+  case "orc" => "orc.compress"
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
String = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+
+assert(codecs.distinct.length == 1)
+codecs.head
+  }
+
+  private def writeDataToTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]) {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else 
""
+sql(
+  s"""
+ |CREATE TABLE $tableName(a int)
+ |$partitionCreate
+ |STORED AS $format
+ |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+ |$tblProperties
+   """.stripMargin)
+
+val partitionInsert = if (isPartitioned) s"partition (p=1)" else ""
+sql(
+  s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |$partitionInsert
+ |SELECT * from table_source
--- End diff --

nit. `from` -> `FROM`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187309
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
+  }
+
+  private def getTableCompressPropName(format: String): String = {
+format.toLowerCase match {
+  case "parquet" => "parquet.compression"
+  case "orc" => "orc.compress"
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
String = {
--- End diff --

The logic means a compression codec from the files. The prefix `getTable` 
looks misleading to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187101
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
--- End diff --

Here, too.
- `SQLConf.PARQUET_COMPRESSION.key` instead of 
"spark.sql.parquet.compression.codec"
- `SQLConf.ORC_COMPRESSION.key` insead of "spark.sql.orc.compression.codec"



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144186944
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
--- End diff --

Could you use keys?
- `HiveUtils.CONVERT_METASTORE_PARQUET.key` instead of 
"spark.sql.hive.convertMetastoreParquet"
- `HiveUtils.CONVERT_METASTORE_ORC.key` instead of 
"spark.sql.hive.convertMetastoreOrc"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...

2017-10-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19477#discussion_r144186840
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -2050,4 +2050,12 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   }
 }
   }
+
+  Seq("orc", "parquet", "csv", "json", "text").foreach { format =>
--- End diff --

Maybe this test case is worth merging into. cc @steveloughran Shall we 
include this test into #18979?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144186680
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

I am trying to be consistent with the behavior of original implementation.
I think I had better override `--` for efficiency


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19477
  
@dongjoon-hyun This is kind of duplicate to #18979, although the viewpoint 
of the issue is different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r144186220
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -25,3 +25,10 @@
 # spark.serializer 
org.apache.spark.serializer.KryoSerializer
 # spark.driver.memory  5g
 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+# spark.ui.allowFramingFrom https://www.example.com/
+# spark.ui.xXssProtection   1; mode=block
+# spark.ui.xContentType.options nosniff
+
+# Enable below only when Spark is running on HTTPS
+# spark.ui.strictTransportSecurity  max-age=31536000
--- End diff --

What's the meaning of this specific number "31536000"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...

2017-10-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19419
  
@vanzin @tgravescs @ajbozarth  what is your opinion on this PR? Is it a 
necessary fix for Spark? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144185928
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

If so, why you clone here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-11 Thread fjh100456
Github user fjh100456 commented on the issue:

https://github.com/apache/spark/pull/19218
  
cc @gatorsmile @dongjoon-hyun 
Is it ok now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19475
  
**[Test build #82655 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82655/testReport)**
 for PR 19475 at commit 
[`f97fb98`](https://github.com/apache/spark/commit/f97fb9808fdeb2a9d46cd70105c7d05b876ad3fa).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19475: [SPARK-22257][SQL]Reserve all non-deterministic expressi...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on the issue:

https://github.com/apache/spark/pull/19475
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144184258
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

There is no need to drop, since the non-deterministic `elem` is not in 
`originals`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19477
  
Hi, @gatorsmile and @cloud-fan .
This is a regression of SPARK-21669 (Internal API for collecting 
metrics/stats during FileFormatWriter jobs) at Spark 2.3.0. Could you review 
this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144184155
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

Sorry, for `Set` I mean the whole `ExpressionSet`.
Please check the logic in `CombineFilters` and my new test case, you will 
understand me immediately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144183735
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

`Set`? It seems `originals` is `ArrayBuffer`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19477: [SPARK-22258][SQL] Writing empty dataset fails with ORC ...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19477
  
**[Test build #82654 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82654/testReport)**
 for PR 19477 at commit 
[`b545f28`](https://github.com/apache/spark/commit/b545f281b19120cc2c9e4197cae4b1315969247d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144183653
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -46,14 +47,20 @@ object ExpressionSet {
  *   set.contains(1 + a) => true
  *   set.contains(a + 2) => false
  * }}}
+ *
+ * For non-deterministic expressions, they are always considered as not 
contained in the [[Set]].
+ * On adding a non-deterministic expression, simply append it to the 
original expressions.
+ * This is consistent with how we define `semanticEquals` between two 
expressions.
  */
 class ExpressionSet protected(
 protected val baseSet: mutable.Set[Expression] = new mutable.HashSet,
 protected val originals: mutable.Buffer[Expression] = new ArrayBuffer)
   extends Set[Expression] {
 
   protected def add(e: Expression): Unit = {
-if (!baseSet.contains(e.canonicalized)) {
+if (!e.deterministic) {
+  originals += e
+} else if (!baseSet.contains(e.canonicalized) ) {
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19477: [SPARK-22258][SQL] Writing empty dataset fails wi...

2017-10-11 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/19477

[SPARK-22258][SQL] Writing empty dataset fails with ORC format

## What changes were proposed in this pull request?

Since [SPARK-8501](https://issues.apache.org/jira/browse/SPARK-8501), Spark 
doesn't create an ORC file for empty data sets. However, 
[SPARK-21669](https://issues.apache.org/jira/browse/SPARK-21669) is trying to 
get the length of the written file at the end of writing tasks and fails with 
`FileNotFoundException`. This is a regression at 2.3.0 only. We had better fix 
this and have a test case to prevent future regression.

```scala
scala> Seq("str").toDS.limit(0).write.format("orc").save("/tmp/a")
17/10/11 19:28:59 ERROR Utils: Aborting task
java.io.FileNotFoundException: File 
file:/tmp/a/_temporary/0/_temporary/attempt_20171011192859__m_00_0/part-0-aa56c3cf-ec35-48f1-bb73-23ad1480e917-c000.snappy.orc
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at 
org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker.getFileSize(BasicWriteStatsTracker.scala:60)
```

## How was this patch tested?

Pass the newly added test cases.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-22258

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19477


commit b545f281b19120cc2c9e4197cae4b1315969247d
Author: Dongjoon Hyun 
Date:   2017-10-12T02:38:51Z

[SPARK-22258][SQL] Writing empty dataset fails with ORC format




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144182958
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -46,14 +47,20 @@ object ExpressionSet {
  *   set.contains(1 + a) => true
  *   set.contains(a + 2) => false
  * }}}
+ *
+ * For non-deterministic expressions, they are always considered as not 
contained in the [[Set]].
+ * On adding a non-deterministic expression, simply append it to the 
original expressions.
+ * This is consistent with how we define `semanticEquals` between two 
expressions.
  */
 class ExpressionSet protected(
 protected val baseSet: mutable.Set[Expression] = new mutable.HashSet,
 protected val originals: mutable.Buffer[Expression] = new ArrayBuffer)
   extends Set[Expression] {
 
   protected def add(e: Expression): Unit = {
-if (!baseSet.contains(e.canonicalized)) {
+if (!e.deterministic) {
+  originals += e
+} else if (!baseSet.contains(e.canonicalized) ) {
--- End diff --

Yeah I have thought about this.
But the time complexity is O(n) for adding an expression.
It is a trade off, I prefer to my current implementation, the time 
complexity is O(1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19318: [WIP][SPARK-22096][ML] use aggregateByKeyLocally in feat...

2017-10-11 Thread VinceShieh
Github user VinceShieh commented on the issue:

https://github.com/apache/spark/pull/19318
  
thanks :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19433
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82652/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r144182701
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
--- End diff --

We should also change the sbt file to make it work using sbt.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19433
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19433
  
**[Test build #82652 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82652/testReport)**
 for PR 19433 at commit 
[`5c29d3d`](https://github.com/apache/spark/commit/5c29d3d1e899c8d311633c4d763b57e42a26c660).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19475: [SPARK-22257][SQL]Reserve all non-deterministic e...

2017-10-11 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19475#discussion_r144182675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 ---
@@ -74,9 +81,13 @@ class ExpressionSet protected(
   }
 
   override def -(elem: Expression): ExpressionSet = {
-val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
-val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
-new ExpressionSet(newBaseSet, newOriginals)
+if (elem.deterministic) {
+  val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
+  val newOriginals = originals.clone().filterNot(_.canonicalized == 
elem.canonicalized)
+  new ExpressionSet(newBaseSet, newOriginals)
+} else {
+  new ExpressionSet(baseSet.clone(), originals.clone())
--- End diff --

No, it is not dropping it. Any non-deterministic `elem` is not considered 
as contained in the Set. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...

2017-10-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19464
  
IIUC this issue also existed in `NewHadoopRDD` and `FileScanRDD` 
(possibly), we'd better also fix them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144181321
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
--- End diff --

I would suggest not to use the name started by "spark.hadoop", this kind of 
configurations will be treated as Hadoop configuration and set into Hadoop 
`Configuration`, it might be better to choose another name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...

2017-10-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19474
  
Minor comments. LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144180788
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -117,7 +117,7 @@ object FileFormatWriter extends Logging {
 job.setOutputValueClass(classOf[InternalRow])
 FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-val allColumns = plan.output
+val allColumns = queryExecution.logical.output
--- End diff --

Btw, shall we use `queryExecution.analyzed.output`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19474: [SPARK-22252][SQL] FileFormatWriter should respec...

2017-10-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19474#discussion_r144180666
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -117,7 +117,7 @@ object FileFormatWriter extends Logging {
 job.setOutputValueClass(classOf[InternalRow])
 FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-val allColumns = plan.output
+val allColumns = queryExecution.logical.output
--- End diff --

I think it'd be good to leave a comment that we should not use optimized 
output here in case it will be changed in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19476
  
**[Test build #82653 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82653/testReport)**
 for PR 19476 at commit 
[`f50a7b7`](https://github.com/apache/spark/commit/f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...

2017-10-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19474
  
I like this change because the relation between `ExecutedCommandExec` and 
`RunnableCommand` is a little entangled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

2017-10-11 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/19476

[SPARK-22062][CORE] Spill large block to disk in BlockManager's remote 
fetch to avoid OOM

## What changes were proposed in this pull request?

In the current BlockManager's `getRemoteBytes`, it will call 
`BlockTransferService#fetchBlockSync` to get remote block. In the 
`fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the 
whole fetched block. This will potentially lead to OOM if block size is too big 
or several blocks are fetched simultaneously in this executor.

So here leveraging the idea of shuffle fetch, to spill the large block to 
local disk before consumed by upstream code. The behavior is controlled by 
newly added configuration, if block size is smaller than the threshold, then 
this block will be persisted in memory; otherwise it will first spill to disk, 
and then read from disk file.

To achieve this feature, what I did is:

1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is 
not only used by shuffle.
2. Add a new `TempFileManager` to manage the files of fetched remote 
blocks, the files are tracked by weak reference, will be deleted when no use at 
all.

## How was this patch tested?

This was tested by adding UT, also manual verification in local test to 
perform GC to clean the files.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-22062

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19476


commit f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b
Author: jerryshao 
Date:   2017-10-12T01:47:35Z

Spill large blocks to disk during remote fetches in BlockManager




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19474: [SPARK-22252][SQL] FileFormatWriter should respect the i...

2017-10-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19474
  
> The scan node is no longer visible above the insert node, I'll fix this 
later. The writer bug is more important and we should fix it ASAP.

Totally agreed. LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18664
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82650/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18664
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18664
  
**[Test build #82650 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82650/testReport)**
 for PR 18664 at commit 
[`efe3e27`](https://github.com/apache/spark/commit/efe3e27a1f374e4482cffe2ce3877aceffc5eaad).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18460
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82649/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...

2017-10-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18460
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18460
  
**[Test build #82649 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82649/testReport)**
 for PR 18460 at commit 
[`52d19d3`](https://github.com/apache/spark/commit/52d19d36bb7f704ca79aa398add03393860d69c2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144154295
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: 
SparkPlan) extends UnaryExecN
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-child.execute().coalesce(numPartitions, shuffle = false)
+if (numPartitions == 1 && child.execute().getNumPartitions < 1) {
--- End diff --

Add a test in DatasetSuite that tests this empty rdd case. maybe in the 
same test as the existing coalesce test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...

2017-10-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19467#discussion_r144152923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -131,17 +132,17 @@ class IncrementalExecution(
   }
 
   override def preparations: Seq[Rule[SparkPlan]] =
-Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
+Seq(state, 
EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ 
super.preparations
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
 }
 
-object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
+case class EnsureStatefulOpPartitioning(conf: SQLConf) extends 
Rule[SparkPlan] {
   // Needs to be transformUp to avoid extra shuffles
   override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
 case so: StatefulOperator =>
-  val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
+  val numPartitions = conf.numShufflePartitions
--- End diff --

Why this change? Doesnt the plan have the same context and conf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19433
  
**[Test build #82652 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82652/testReport)**
 for PR 19433 at commit 
[`5c29d3d`](https://github.com/apache/spark/commit/5c29d3d1e899c8d311633c4d763b57e42a26c660).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19472: [WIP][SPARK-22246][SQL] Improve performance of UnsafeRow...

2017-10-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19472
  
**[Test build #82651 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82651/testReport)**
 for PR 19472 at commit 
[`a814eb3`](https://github.com/apache/spark/commit/a814eb3f08085b09a16f336b36fba8da24e4f34a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >