[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...

2018-10-23 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22790
  
This shouldn't block 2.4.0 release. Based on the code, it doesn't introduce 
regression to existing features (just using V1 format and ignore trainingCost 
and distanceMeasure). Correctness issue occurs only when someone uses a 
non-default distanceMeasure and then save/load. Could someone help confirm?

If current vote passes, we can list it as an known issue in the release 
notes and fix it in 2.4.1. If other blockers show up, we fix it before RC5. 
Btw, this PR needs a regression test in order to merge.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...

2018-10-19 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22756
  
We have to revert this PR in branch-2.4. It is not a blocker and we 
shouldn't merge it to branch-2.4 this late in this already delayed release.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...

2018-10-08 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22675#discussion_r223566032
  
--- Diff: docs/ml-datasource.md ---
@@ -0,0 +1,51 @@
+---
+layout: global
+title: Data sources
+displayTitle: Data sources
+---
+
+In this section, we introduce how to use data source in ML to load data.
+Beside some general data sources "parquat", "csv", "json", "jdbc", we also 
provide some specific data source for ML.
+
+**Table of Contents**
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+## Image data source
+
+This image data source is used to load libsvm data files from directory.
+
+
+

+[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource)
+implements Spark SQL data source API for loading image data as DataFrame.
+The loaded DataFrame has one StructType column: "image". containing image 
data stored as image schema.
+
+{% highlight scala %}
+scala> spark.read.format("image").load("data/mllib/images/origin")
+res1: org.apache.spark.sql.DataFrame = [image: struct]
+{% endhighlight %}
+
+
+

+[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html)
--- End diff --

Usually it depends on how important the use case is. For example, CSV was 
created as an external data source and later merged into Spark. See 
https://issues.apache.org/jira/browse/SPARK-21866?focusedCommentId=16148268=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16148268.
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...

2018-10-02 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22492
  
@cloud-fan said above the next version is very likely to be 2.5.0 instead 
of 3.0. Well the next version number is not fully discussed yet. For that 
reason, I think we should revert the changes in master as well. After that we 
should check if the feature itself can be added without introducing breaking 
changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...

2018-10-02 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22492
  
@WeichenXu123 @cloud-fan I made https://github.com/apache/spark/pull/22618 
to revert the change in master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22618: [SPARK-25321][ML] Revert SPARK-14681 to avoid API...

2018-10-02 Thread mengxr
GitHub user mengxr opened a pull request:

https://github.com/apache/spark/pull/22618

[SPARK-25321][ML] Revert SPARK-14681 to avoid API breaking change

## What changes were proposed in this pull request?

This is the same as #22492 but for master branch. Revert SPARK-14681 to 
avoid API breaking changes.

cc: @WeichenXu123 

## How was this patch tested?

Existing unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mengxr/spark SPARK-25321.master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22618


commit 90eb1d7f5895e442a86506e3e7dae382e138b3b0
Author: WeichenXu 
Date:   2018-09-21T20:05:24Z

[SPARK-25321][ML] Revert SPARK-14681 to avoid API breaking change

## What changes were proposed in this pull request?

Revert SPARK-14681 to avoid API breaking change. PR [SPARK-14681] will 
break mleap.

## How was this patch tested?

N/A

Closes #22492 from WeichenXu123/revert_tree_change.

Authored-by: WeichenXu 
Signed-off-by: Xiangrui Meng 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...

2018-09-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22492
  
@WeichenXu123 Please close this PR manually. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22510: [SPARK-25321][ML] Fix local LDA model constructor

2018-09-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22510
  
LGTM. Merged into master and branch 2.4. Thanks for checking compatibility 
with MLeap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...

2018-09-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22492
  
LGTM. Merged into branch-2.4. @WeichenXu123 Next time please create 
dedicated JIRAs for each QA task PR. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...

2018-09-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22492
  
We can keep it in master if the next release is 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22449: [SPARK-22666][ML][FOLLOW-UP] Improve testcase to tolerat...

2018-09-19 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22449
  
LGTM. Merged into master and branch-2.4. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22449: [SPARK-22666][ML][FOLLOW-UP] Return a correctly formatte...

2018-09-18 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22449
  
@WeichenXu123 I think we should fix the test instead of removing "//" from 
URI if authority is empty. Because both "scheme:/" and "scheme:///" are valid.

~~~scala
scala> val u1 = new URI("file:///a/b/c")
u1: java.net.URI = file:///a/b/c

scala> val u2 = new URI("file:/a/b/c")
u2: java.net.URI = file:/a/b/c

scala> u1 == u2
res1: Boolean = true
~~~

Shall we update the test? Instead of compare the row record, we compare its 
fields one by one and convert `origin` to `URI` before comparison?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22449: [SPARK-22666][ML][FOLLOW-UP] Return a correctly f...

2018-09-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22449#discussion_r218498363
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala ---
@@ -85,7 +85,9 @@ private[image] class ImageFileFormat extends FileFormat 
with DataSourceRegister
 val filteredResult = if (imageSourceOptions.dropInvalid) {
   resultOpt.toIterator
 } else {
-  
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
+  val basePath = 
Path.getPathWithoutSchemeAndAuthority(path).toString()
--- End diff --

Seems authority got dropped here.

~~~
scala> Path.getPathWithoutSchemeAndAuthority(new 
Path("s3://dbc/test/ajdj/dfdfd"))
res10: org.apache.hadoop.fs.Path = /test/ajdj/dfdfd
~~~


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22349: [SPARK-25345][ML] Deprecate public APIs from ImageSchema

2018-09-08 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22349
  
LGTM. Merged into master and branch-2.4. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22349: [SPARK-25345][ML] Deprecate public APIs from ImageSchema

2018-09-08 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22349
  
@WeichenXu123 Could you address the comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22349: [SPARK-25345][ML] Deprecate public APIs from Imag...

2018-09-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22349#discussion_r215840879
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -35,6 +35,8 @@ import org.apache.spark.sql.types._
  */
 @Experimental
 @Since("2.3.0")
+@deprecated("use `spark.read.format(\"image\").load(path)` and this 
`ImageSchema` will be " +
--- End diff --

There are other methods defined under `ImageSchema` that are not covered by 
the image data source. So we shall only deprecate `readImages` and leave other 
public methods as experimental. Same for Python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-05 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
Merged into master. Thanks @WeichenXu123 for the implementation and 
everyone for the review! I created the following JIRAs as follow-ups:

* deprecate ImageSchema: https://issues.apache.org/jira/browse/SPARK-25345
* list built-in data sources in doc site: 
https://issues.apache.org/jira/browse/SPARK-25346
* doc for image data source: 
https://issues.apache.org/jira/browse/SPARK-25347
* data source for binary files: 
https://issues.apache.org/jira/browse/SPARK-25348
* support sample pushdown: https://issues.apache.org/jira/browse/SPARK-25349


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-05 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
The image data source tests passed but JVM crashed at the end. Triggered 
another test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-05 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-05 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
LGTM pending tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-05 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22165
  
I didn't make a full pass over the tests. @jiangxb1987 let me know if you 
need me to take a pass. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215326727
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcTimeout
+
+class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext 
{
+
+  /**
+   * Get the current barrierEpoch from barrierCoordinator.states by 
ContextBarrierId
+   */
+  def getCurrentBarrierEpoch(
+  stageId: Int, stageAttemptId: Int, barrierCoordinator: 
BarrierCoordinator): Int = {
+val barrierId = ContextBarrierId(stageId, stageAttemptId)
+barrierCoordinator.states.get(barrierId).barrierEpoch
+  }
+
+  test("normal test for single task") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val stageId = 0
+val stageAttemptNumber = 0
+rpcEndpointRef.askSync[Unit](
+  message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, 
taskAttemptId = 0,
+barrierEpoch = 0),
+  timeout = new RpcTimeout(5 seconds, "rpcTimeOut"))
+// sleep for waiting barrierEpoch value change
+Thread.sleep(500)
+assert(getCurrentBarrierEpoch(stageId, stageAttemptNumber, 
barrierCoordinator) == 1)
+  }
+
+  test("normal test for multi tasks") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val numTasks = 3
+val stageId = 0
+val stageAttemptNumber = 0
+val rpcTimeOut = new RpcTimeout(5 seconds, "rpcTimeOut")
+// sync request from 3 tasks
+(0 until numTasks).foreach { taskId =>
+  new Thread(s"task-$taskId-thread") {
+setDaemon(true)
+override def run(): Unit = {
+  rpcEndpointRef.askSync[Unit](
+message = RequestToSync(numTasks, stageId, stageAttemptNumber, 
taskAttemptId = taskId,
+  barrierEpoch = 0),
+timeout = rpcTimeOut)
+}
+  }.start()
+}
+// sleep for waiting barrierEpoch value change
+Thread.sleep(500)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215326394
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcTimeout
+
+class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext 
{
+
+  /**
+   * Get the current barrierEpoch from barrierCoordinator.states by 
ContextBarrierId
+   */
+  def getCurrentBarrierEpoch(
+  stageId: Int, stageAttemptId: Int, barrierCoordinator: 
BarrierCoordinator): Int = {
+val barrierId = ContextBarrierId(stageId, stageAttemptId)
+barrierCoordinator.states.get(barrierId).barrierEpoch
+  }
+
+  test("normal test for single task") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val stageId = 0
+val stageAttemptNumber = 0
+rpcEndpointRef.askSync[Unit](
+  message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, 
taskAttemptId = 0,
+barrierEpoch = 0),
+  timeout = new RpcTimeout(5 seconds, "rpcTimeOut"))
+// sleep for waiting barrierEpoch value change
+Thread.sleep(500)
--- End diff --

Do not use explicit sleep. It basically means adding 0.5 seconds to total 
test time and flakyness. Use conditional wait, for example: 
https://github.com/apache/spark/commit/bfb74394a5513134ea1da9fcf4a1783b77dd64e4#diff-a90010f459c27926238d7a4ce5a0aca1R107


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215324595
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator(
 
   // Record all active stage attempts that make barrier() call(s), and the 
corresponding internal
   // state.
-  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+  private[spark] val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
--- End diff --

Could you turn the `// ...` comment into ScalaDoc `/** ... */` and mention 
`Visible for unit testing.` in the doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215322021
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
+ * The loaded `DataFrame` has one `StructType` column: `image`.
+ * The schema of the `image` column is:
+ *  - origin: String (represents the origin of the image.
+ *If loaded from files, then it is the file path)
+ *  - height: Int (height of the image)
+ *  - width: Int (width of the image)
+ *  - nChannels: Int (number of the image channels)
+ *  - mode: Int (OpenCV-compatible type)
+ *  - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise 
BGR in most cases)
+ *
+ * To use IMAGE data source, you need to set "image" as the format in 
`DataFrameReader` and
+ * optionally specify the data source options, for example:
+ * {{{
+ *   // Scala
+ *   val df = spark.read.format("image")
+ * .option("dropImageFailures", true)
+ * .load("data/mllib/images/partitioned")
+ *
+ *   // Java
+ *   Dataset df = spark.read().format("image")
+ * .option("dropImageFailures", true)
+ * .load("data/mllib/images/partitioned");
+ * }}}
+ *
+ * IMAGE data source supports the following options:
+ *  - "dropImageFailures": Whether to drop the files that are not valid 
images from the result.
--- End diff --

How about changing `dropImageFailures` to `dropInvalid`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215322673
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageOptions.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+private[image] class ImageOptions(
+@transient private val parameters: CaseInsensitiveMap[String]) extends 
Serializable {
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  val dropImageFailures = parameters.getOrElse("dropImageFailures", 
"false").toBoolean
--- End diff --

Should add ScalaDoc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215320762
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
--- End diff --

"IMAGE" doesn't need to be all uppercase. Just say "loading images".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215321353
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
+ * The loaded `DataFrame` has one `StructType` column: `image`.
+ * The schema of the `image` column is:
+ *  - origin: String (represents the origin of the image.
+ *If loaded from files, then it is the file path)
+ *  - height: Int (height of the image)
+ *  - width: Int (width of the image)
+ *  - nChannels: Int (number of the image channels)
+ *  - mode: Int (OpenCV-compatible type)
+ *  - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise 
BGR in most cases)
+ *
+ * To use IMAGE data source, you need to set "image" as the format in 
`DataFrameReader` and
--- End diff --

ditto on "IMAGE"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215320923
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
+ * The loaded `DataFrame` has one `StructType` column: `image`.
+ * The schema of the `image` column is:
+ *  - origin: String (represents the origin of the image.
+ *If loaded from files, then it is the file path)
--- End diff --

does it always load from files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215323149
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+import java.nio.file.Paths
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.image.ImageSchema._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.{col, substring_index}
+
+class ImageFileFormatSuite extends SparkFunSuite with 
MLlibTestSparkContext {
+
+  // Single column of images named "image"
+  private lazy val imagePath = "../data/mllib/images/imagesWithPartitions"
+
+  test("image datasource count test") {
+val df1 = spark.read.format("image").load(imagePath)
+assert(df1.count === 9)
+
+val df2 = spark.read.format("image").option("dropImageFailures", 
"true").load(imagePath)
+assert(df2.count === 8)
+  }
+
+  test("image datasource test: read jpg image") {
+val df = spark.read.format("image").load(imagePath + 
"/cls=kittens/date=2018-02/DP153539.jpg")
+assert(df.count() === 1)
+  }
+
+  test("image datasource test: read png image") {
+val df = spark.read.format("image").load(imagePath + 
"/cls=multichannel/date=2018-01/BGRA.png")
+assert(df.count() === 1)
+  }
+
+  test("image datasource test: read non image") {
+val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt"
+val df = spark.read.format("image").option("dropImageFailures", "true")
+  .load(filePath)
+assert(df.count() === 0)
+
+val df2 = spark.read.format("image").option("dropImageFailures", 
"false")
+  .load(filePath)
+assert(df2.count() === 1)
+val result = df2.head()
+assert(result === invalidImageRow(
+  Paths.get(filePath).toAbsolutePath().normalize().toUri().toString))
+  }
+
+  test("image datasource partition test") {
+val result = spark.read.format("image")
+  .option("dropImageFailures", "true").load(imagePath)
+  .select(substring_index(col("image.origin"), "/", -1).as("origin"), 
col("cls"), col("date"))
+  .collect()
+
+assert(Set(result: _*) === Set(
+  Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
+  Row("54893.jpg", "kittens", "2018-02"),
+  Row("DP153539.jpg", "kittens", "2018-02"),
+  Row("DP802813.jpg", "kittens", "2018-02"),
+  Row("BGRA.png", "multichannel", "2018-01"),
+  Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
+  Row("chr30.4.184.jpg", "multichannel", "2018-02"),
+  Row("grayscale.jpg", "multichannel", "2018-02")
+))
+  }
+
+  // Images with the different number of channels
+  test("readImages pixel values test") {
+
+val images = spark.read.format("image").option("dropImageFailures", 
"true")
+  .load(imagePath + "/cls=multichannel/").collect()
+
+val firstBytes20Map = images.map { rrow =>
+  val row = rrow.getAs[Row]("image")
+  val filename = Paths.get(getOrigin(row)).getFileName().toString()
+  val mode = getMode(row)
+  val bytes20 = getData(row).slice(0, 20).toList
+  filename -> Tuple2(mode, bytes20)
--- End diff --

nit: It is useful to leave an inline comment here:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-05 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r215179601
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
+ * The loaded `DataFrame` has one `StructType` column: `image`.
+ * The schema of the `image` column is:
+ *  - origin: String (represent the origin of image. If loaded from file, 
then it is file path)
+ *  - height: Int (height of image)
+ *  - width: Int (width of image)
+ *  - nChannels: Int (number of image channels)
+ *  - mode: Int (OpenCV-compatible type)
+ *  - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise 
BGR in most cases)
+ *
+ * To use IMAGE data source, you need to set "image" as the format in 
`DataFrameReader` and
+ * optionally specify the datasource options, for example:
+ * {{{
+ *   // Scala
+ *   val df = spark.read.format("image")
+ * .option("dropImageFailures", "true")
+ * .load("data/mllib/images/imagesWithPartitions")
+ *
+ *   // Java
+ *   Dataset df = spark.read().format("image")
+ * .option("dropImageFailures", "true")
+ * .load("data/mllib/images/imagesWithPartitions");
+ * }}}
+ *
+ * IMAGE data source supports the following options:
+ *  - "dropImageFailures": Whether to drop the files that are not valid 
images from the result.
+ *
+ * @note This IMAGE data source does not support "write".
+ *
+ * @note This class is public for documentation purpose. Please don't use 
this class directly.
+ * Rather, use the data source API as illustrated above.
+ */
+class ImageDataSource private() {}
--- End diff --

Re: @cloud-fan The Scala package doc doesn't work for Java, which requires 
a different format.

Re: @HyukjinKwon It would be nice to have some doc in the site, though I 
didn't find the list of built-in data sources in the doc site. I think it is 
okay to have docs in both locations for IDE users and for people search on the 
web.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
That doesn't work for Java, if I remember the issue correctly.

On Tue, Sep 4, 2018, 10:31 PM Wenchen Fan  wrote:

> *@cloud-fan* commented on this pull request.
> --
>
> In
> 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala
> <https://github.com/apache/spark/pull/22328#discussion_r215140040>:
>
> > + *
> + *   // Java
> + *   Dataset df = spark.read().format("image")
> + * .option("dropImageFailures", "true")
> + * .load("data/mllib/images/imagesWithPartitions");
> + * }}}
> + *
> + * IMAGE data source supports the following options:
> + *  - "dropImageFailures": Whether to drop the files that are not valid 
images from the result.
> + *
> + * @note This IMAGE data source does not support "write".
> + *
> + * @note This class is public for documentation purpose. Please don't 
use this class directly.
> + * Rather, use the data source API as illustrated above.
> + */
> +class ImageDataSource private() {}
>
> Is this a convention? AFAIK in the scala world we usually put document in
> package object.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22328#discussion_r215140040>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAyozMfwWO72FJuY89Yvq1Yl3L-oAOW6ks5uX2GtgaJpZM4WYrQ2>
> .
>
-- 

Xiangrui Meng

Software Engineer

Databricks Inc. [image: http://databricks.com] <http://databricks.com/>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
@mhamilton723 I thought about that option too. Loading general binary files 
is a useful feature but I don't feel it is necessary to pull it into the 
current scope. No matter whether the image data source has its own 
implementation or builds on top of the binary data source, I expect users to use

~~~scala
spark.read.format("image").load("...")
~~~

to read images instead of something like:

~~~scala
spark.read.format("binary").load("...").withColumn("image", 
decode($"binary"))
~~~

So we can definitely add binary file data source later and swap the 
implementation without changing the public interface. But we don't need to 
block this PR getting into 2.4, which will be cut soon.

Sounds good?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22240
  
Merged into master. Thanks for review!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214981991
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.spark.ml.image.ImageSchema
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, 
OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+
+private[image] class ImageFileFormatOptions(
+@transient private val parameters: CaseInsensitiveMap[String]) extends 
Serializable {
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  val dropImageFailures = parameters.getOrElse("dropImageFailures", 
"false").toBoolean
+}
+
+private[image] class ImageFileFormat extends FileFormat with 
DataSourceRegister {
+
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = 
Some(ImageSchema.imageSchema)
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job, options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+throw new UnsupportedOperationException(
+  s"prepareWrite is not supported for image data source")
+  }
+
+  override def shortName(): String = "image"
+
+  override protected def buildReader(
+  sparkSession: SparkSession,
+  dataSchema: StructType,
+  partitionSchema: StructType,
+  requiredSchema: StructType,
+  filters: Seq[Filter],
+  options: Map[String, String],
+  hadoopConf: Configuration): (PartitionedFile) => 
Iterator[InternalRow] = {
--- End diff --

It won't be addressed in this PR. The best way to support it is to allow 
data source handle sampling operation. cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
Yes, the ImageSchema implementation are used by the data source, which we 
cannot remove:) We are only going to mark the public APIs there as deprecated. 
The goal is to provide users a unified approach to load data into Spark. Users 
usually find `ImageSchema.readImages` hard to discover.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22328
  
@imatiach-msft @HyukjinKwon The plan is to mark `ImageSchema` deprecated in 
2.4 and remove it in 3.0. So loading images will be the same as loading data 
from other sources.

The gaps are sampling and partition controlling, which might require more 
testing after 2.4. It would be great if you can help. For sampling, I'm 
thinking of allowing data source to handle sample operations. @cloud-fan is it 
feasible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214969542
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala ---
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
 
 class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
   // Single column of images named "image"
-  private lazy val imagePath = "../data/mllib/images"
+  private lazy val imagePath = "../data/mllib/images/images"
--- End diff --

"images/images" is confusing. Call it `images/origin` and 
`images/partitioned`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214967994
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
--- End diff --

Should mention it doesn't support write.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214969782
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+import java.nio.file.Paths
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.image.ImageSchema._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.{col, substring_index}
+
+class ImageFileFormatSuite extends SparkFunSuite with 
MLlibTestSparkContext {
+
+  // Single column of images named "image"
+  private lazy val imagePath = "../data/mllib/images/imagesWithPartitions"
+
+  test("image datasource count test") {
+val df1 = spark.read.format("image").load(imagePath)
+assert(df1.count === 9)
+
+val df2 = spark.read.format("image").option("dropImageFailures", 
"true").load(imagePath)
+assert(df2.count === 8)
+  }
+
+  test("image datasource test: read jpg image") {
+val df = spark.read.format("image").load(imagePath + 
"/cls=kittens/date=2018-02/DP153539.jpg")
+assert(df.count() === 1)
+  }
+
+  test("image datasource test: read png image") {
+val df = spark.read.format("image").load(imagePath + 
"/cls=multichannel/date=2018-01/BGRA.png")
+assert(df.count() === 1)
+  }
+
+  test("image datasource test: read non image") {
+val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt"
+val df = spark.read.format("image").option("dropImageFailures", "true")
+  .load(filePath)
+assert(df.count() === 0)
+
+val df2 = spark.read.format("image").option("dropImageFailures", 
"false")
+  .load(filePath)
+assert(df2.count() === 1)
+val result = df2.head()
+assert(result === invalidImageRow(
+  Paths.get(filePath).toAbsolutePath().normalize().toUri().toString))
+  }
+
+  test("image datasource partition test") {
+val result = spark.read.format("image")
+  .option("dropImageFailures", "true").load(imagePath)
+  .select(substring_index(col("image.origin"), "/", -1).as("origin"), 
col("cls"), col("date"))
+  .collect()
+
+assert(Set(result: _*) === Set(
+  Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
+  Row("54893.jpg", "kittens", "2018-02"),
+  Row("DP153539.jpg", "kittens", "2018-02"),
+  Row("DP802813.jpg", "kittens", "2018-02"),
+  Row("BGRA.png", "multichannel", "2018-01"),
+  Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
+  Row("chr30.4.184.jpg", "multichannel", "2018-02"),
+  Row("grayscale.jpg", "multichannel", "2018-02")
+))
+  }
+
+  // Images with the different number of channels
+  test("readImages pixel values test") {
+
+val images = spark.read.format("image").option("dropImageFailures", 
"true")
+  .load(imagePath + "/cls=multichannel/").collect()
+
+val firstBytes20Map = images.map { rrow =>
+  val row = rrow.getAs[Row]("image")
+  val filename = Paths.get(getOrigin(row)).getFileName().toString()
+  val mode = getMode(row)
+  val bytes20 = getData(row).slice(0, 20).toList
+  filename -> Tuple2(mode, bytes20)
+}.toMap
--- End diff --

use Set instead of Map


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214967452
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+/**
+ * `image` package implements Spark SQL data source API for loading IMAGE 
data as `DataFrame`.
+ * The loaded `DataFrame` has one `StructType` column: `image`.
+ * The schema of the `image` column is:
+ *  - origin: String (represent the origin of image. If loaded from file, 
then it is file path)
+ *  - height: Int (height of image)
+ *  - width: Int (width of image)
+ *  - nChannels: Int (number of image channels)
+ *  - mode: Int (OpenCV-compatible type)
+ *  - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise 
BGR in most cases)
+ *
+ * To use IMAGE data source, you need to set "image" as the format in 
`DataFrameReader` and
+ * optionally specify options, for example:
+ * {{{
+ *   // Scala
+ *   val df = spark.read.format("image")
+ * .option("dropImageFailures", "true")
+ * .load("data/mllib/images/imagesWithPartitions")
+ *
+ *   // Java
+ *   Dataset df = spark.read().format("image")
+ * .option("dropImageFailures", "true")
+ * .load("data/mllib/images/imagesWithPartitions");
+ * }}}
+ *
+ * IMAGE data source supports the following options:
+ *  - "dropImageFailures": Whether to drop the files that are not valid 
images from the result.
+ *
+ * @note This class is public for documentation purpose. Please don't use 
this class directly.
+ * Rather, use the data source API as illustrated above.
--- End diff --

I didn't see a section in the doc that lists all built-in data sources. It 
would be nice if we create a section and link it to this API doc. I think we 
can do it with a follow-up PR. I want to see if we can get this PR merged 
before branch cut:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...

2018-09-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22328#discussion_r214968664
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.source.image
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.spark.ml.image.ImageSchema
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, 
OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+
+private[image] class ImageFileFormatOptions(
+@transient private val parameters: CaseInsensitiveMap[String]) extends 
Serializable {
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  val dropImageFailures = parameters.getOrElse("dropImageFailures", 
"false").toBoolean
+}
+
+private[image] class ImageFileFormat extends FileFormat with 
DataSourceRegister {
+
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = 
Some(ImageSchema.imageSchema)
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job, options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+throw new UnsupportedOperationException(
+  s"prepareWrite is not supported for image data source")
--- End diff --

The error message is user-facing and users do not know `prepareWrite`. So 
just say "Write is not supported"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22271: [SPARK-25268][GraphX]run Parallel Personalized PageRank ...

2018-08-31 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22271
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22261: [SPARK-25248.1][PYSPARK] update barrier Python API

2018-08-29 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22261
  
Merged into master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22261: [SPARK-25248.1][PYSPARK] update barrier Python API

2018-08-29 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22261
  
There are two PRs from that JIRA, one for Scala APIs and one for Python APIs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22258: [SPARK-25266][CORE] Fix memory leak in Barrier Execution...

2018-08-29 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22258
  
Merged into master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22247: [SPARK-25253][PYSPARK] Refactor local connection & auth ...

2018-08-28 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22247
  
@squito Thanks for the refactor!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for...

2018-08-28 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r213537490
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -68,7 +74,7 @@ class BarrierTaskContext(
*
* CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
* possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
-   * timeout. Some examples of misuses listed below:
+   * timeout. Some examples of '''misuses''' listed below:
--- End diff --

just saw it, will include it if Jenkins fails:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22261: [SPARK-25248.1][PYSPARK] update barrier Python AP...

2018-08-28 Thread mengxr
GitHub user mengxr opened a pull request:

https://github.com/apache/spark/pull/22261

[SPARK-25248.1][PYSPARK] update barrier Python API

## What changes were proposed in this pull request?

I made one pass over the Python APIs for barrier mode and updated them to 
match the Scala doc in #22240 . Major changes:

* export the public classes
* expand the docs
* add doc for BarrierTaskInfo.addresss

cc: @jiangxb1987 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mengxr/spark SPARK-25248.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22261


commit afb50ee1150279f9cb27f92e220a332e029dbc43
Author: Xiangrui Meng 
Date:   2018-08-29T03:44:54Z

update barrier Python API




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22258: [SPARK-25266][CORE] Fix memory leak in Barrier Execution...

2018-08-28 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22258
  
LGTM pending test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-08-28 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22240
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r212863571
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala ---
@@ -22,15 +22,22 @@ import scala.reflect.ClassTag
 import org.apache.spark.TaskContext
 import org.apache.spark.annotation.{Experimental, Since}
 
-/** Represents an RDD barrier, which forces Spark to launch tasks of this 
stage together. */
-class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
+/**
+ * :: Experimental ::
+ * Wraps an RDD in a barrier stage, which forces Spark to launch tasks of 
this stage together.
+ * [[RDDBarrier]] instances are created by [[RDD.barrier]].
+ */
+@Experimental
+@Since("2.4.0")
+class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
--- End diff --

also hide the constructor here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r212863543
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -28,4 +28,4 @@ import org.apache.spark.annotation.{Experimental, Since}
  */
 @Experimental
 @Since("2.4.0")
-class BarrierTaskInfo(val address: String)
+class BarrierTaskInfo private[spark] (val address: String)
--- End diff --

hide the constructor since this is not to be constructed by user


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r212863507
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -21,25 +21,31 @@ import java.util.{Properties, Timer, TimerTask}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
-
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout}
 import org.apache.spark.util.{RpcUtils, Utils}
 
-/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
-class BarrierTaskContext(
+/**
+ * :: Experimental ::
+ * A [[TaskContext]] with extra contextual info and tooling for tasks in a 
barrier stage.
+ * Use [[BarrierTaskContext#get]] to obtain the barrier context for a 
running barrier task.
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskContext private[spark] (
--- End diff --

Made the constructor package private to force users get it from `#get()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r212863444
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -68,7 +74,7 @@ class BarrierTaskContext(
*
* CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
* possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
-   * timeout. Some examples of misuses listed below:
+   * timeout. Some examples of '''misuses''' listed below:
--- End diff --

use bold font to make sure users don't misread


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r212863381
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -21,25 +21,31 @@ import java.util.{Properties, Timer, TimerTask}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
-
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout}
 import org.apache.spark.util.{RpcUtils, Utils}
 
-/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
-class BarrierTaskContext(
+/**
+ * :: Experimental ::
+ * A [[TaskContext]] with extra contextual info and tooling for tasks in a 
barrier stage.
+ * Use [[BarrierTaskContext#get]] to obtain the barrier context for a 
running barrier task.
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskContext private[spark] (
 override val stageId: Int,
 override val stageAttemptNumber: Int,
 override val partitionId: Int,
 override val taskAttemptId: Long,
 override val attemptNumber: Int,
-override val taskMemoryManager: TaskMemoryManager,
+private[spark] override val taskMemoryManager: TaskMemoryManager,
--- End diff --

This is not exposed by `TaskContext`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...

2018-08-26 Thread mengxr
GitHub user mengxr opened a pull request:

https://github.com/apache/spark/pull/22240

[WIP] [SPARK-25248] [CORE] Audit barrier APIs for 2.4

## What changes were proposed in this pull request?

I made one pass over barrier APIs added to Spark 2.4 and updates some 
scopes and docs.

TODOs:
- [ ] scala doc
- [ ] python doc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mengxr/spark SPARK-25248

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22240


commit 19e380ab4f7242f2f2ef48aca81445b0adf0a87d
Author: Xiangrui Meng 
Date:   2018-08-27T04:18:54Z

update barrier Scala doc




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22225: [SPARK-25234][SPARKR] avoid integer overflow in parallel...

2018-08-24 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/5
  
Merged into master and branch-2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22225: [SPARK-25234][SPARKR] avoid integer overflow in parallel...

2018-08-24 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/5
  
cc @falaki 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22225: [SPARK-25234][SPARKR] avoid integer overflow in p...

2018-08-24 Thread mengxr
GitHub user mengxr opened a pull request:

https://github.com/apache/spark/pull/5

[SPARK-25234][SPARKR] avoid integer overflow in parallelize

## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. 
It might cause integer overflow.

## How was this patch tested?

unit test

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mengxr/spark SPARK-25234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5


commit 20f9498384ef5587970c3673c86488404ee89a54
Author: Xiangrui Meng 
Date:   2018-08-24T18:01:14Z

add test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22171: [SPARK-25177][SQL] When dataframe decimal type co...

2018-08-22 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22171#discussion_r211867603
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---
@@ -197,7 +197,7 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
 }
   }
 
-  override def toString: String = toBigDecimal.toString()
+  override def toString: String = toBigDecimal.bigDecimal.toPlainString()
--- End diff --

I don't recall anything that is relevant:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

2018-08-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22085
  
LGTM. I'm merging this into master. We might need a minor refactor for 
readability. But it shouldn't block developers testing this new feature. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...

2018-08-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22158
  
Merged into master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22112
  
Then it doesn't meet the requirements for those operations used by MLlib:
* sampling
* zipWithIndex, zipWithUniqueId
* we also use zip, assuming the ordering from the source RDD is preserved, 
e.g., 
https://github.com/apache/spark/blob/e50192494d1ae1bdaf845ddd388189998c1a2403/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L403


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...

2018-08-21 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22158
  
LGTM pending Jenkins. Thanks for finding those corner cases!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-20 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22112
  
If "always return the same result with same order when rerun." is the 
definition of "idempotent", then yes, MLlib RDD closures always returns the 
same result if the input doesn't change. We use pseudo-randomness to achieve 
deterministic behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211360719
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+write_with_length("run".encode("utf-8"), sockfile)
+sockfile.flush()
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
--- End diff --

Q: Does it handle python worker reuse?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211356245
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
--- End diff --

Should document how this is different from the one in `context.py`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211359959
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -381,6 +465,20 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
   }
 }
   }
+
+  def writeUTF(str: String, dataOut: DataOutputStream) {
+val bytes = str.getBytes(StandardCharsets.UTF_8)
--- End diff --

nit: `UTF_8` or always use `StandardCharsets`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211358615
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
+  val cmdString = readUtf8(sock)
+  if (cmdString.equals("run")) {
--- End diff --

If we do not expect any other command from the socket, we should throw an 
exception


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211356840
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -76,6 +77,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   // TODO: support accumulator in multiple UDF
   protected val accumulator = funcs.head.funcs.head.accumulator
 
+  // Expose a ServerSocket to support method calls via socket from Python 
side.
+  private[spark] var serverSocket: Option[ServerSocket] = None
+
+  // Authentication helper used when serving method calls via socket from 
Python side.
+  private lazy val authHelper = {
+val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
--- End diff --

When `SparkEnv.get` returns null?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211355022
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -20,15 +20,16 @@ package org.apache.spark.api.python
 import java.io._
 import java.net._
 import java.nio.charset.StandardCharsets
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark._
+import org.apache.spark.{SparkException, _}
--- End diff --

`_` should include `SparkException` already


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211359028
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
+  val cmdString = readUtf8(sock)
+  if (cmdString.equals("run")) {
+sock.setSoTimeout(0)
+barrierAndServe(sock)
+  }
+} catch {
+  case _: SocketException =>
--- End diff --

Is the the timeout exception? I don't see any exception that we could 
silently ignore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211358743
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
--- End diff --

Should add a comment about this timeout.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211357983
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
--- End diff --

minor: useful to add `/* port */` and `/* backlog */` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22137: [MINOR][DOC][SQL] use one line for annotation arg value

2018-08-17 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22137
  
cc: @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22137: [MINOR][DOC][SQL] use one line for annotation arg...

2018-08-17 Thread mengxr
GitHub user mengxr opened a pull request:

https://github.com/apache/spark/pull/22137

[MINOR][DOC][SQL] use one line for annotation arg value

## What changes were proposed in this pull request?

Put annotation args in one line, or API doc generation will fail.

~~~
[error] 
/Users/meng/src/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:1559:
 annotation argument needs to be a constant; found: "_FUNC_(expr) - Returns the 
character length of string data or number of bytes of ".+("binary data. The 
length of string data includes the trailing spaces. The length of binary 
").+("data includes binary zeros.")
[error] "binary data. The length of string data includes the trailing 
spaces. The length of binary " +
[error] 
 ^
[info] No documentation generated with unsuccessful compiler run
[error] one error found
[error] (catalyst/compile:doc) Scaladoc generation failed
[error] Total time: 27 s, completed Aug 17, 2018 3:20:08 PM
~~~

## How was this patch tested?

sbt catalyst/compile:doc passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mengxr/spark minor-doc-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22137


commit e9a93762aeeb219cf9ab600da248a0d1f295d09f
Author: Xiangrui Meng 
Date:   2018-08-17T22:47:04Z

fix a minor issue to generate API docs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

2018-08-15 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22085
  
@HyukjinKwon Thanks for the feedback! We will replace the py4j route by a 
special implementation that can only trigger "context.barrier()" in JVM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-15 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
LGTM. Merged into master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-14 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
@kiszk Thanks for the note! I reverted the change in DAGSchedulerSuite. 
Let's try Jenkins again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209846054
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, ctx):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._barrierContext = ctx
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+self._barrierContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call getTaskInfos() before 
initialize " +
+"BarrierTaskContext.")
+else:
+java_list = self._barrierContext.getTaskInfos()
+return [BarrierTaskInfo(h) for h in java_list]
+
+
+class BarrierTaskInfo(object):
+"""
+.. note:: Experimental
+
+Carries all task infos of a barrier task.
+
+.. versionadded:: 2.4.0
+"""
+
+def __init__(self, info):
+self.address = info.address
--- End diff --

* should be `info.address()`
* better to rename `info` to `jobj` to make it clear this is from Java


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209846015
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, ctx):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._barrierContext = ctx
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+self._barrierContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call getTaskInfos() before 
initialize " +
+"BarrierTaskContext.")
+else:
+java_list = self._barrierContext.getTaskInfos()
+return [BarrierTaskInfo(h) for h in java_list]
+
+
+class BarrierTaskInfo(object):
+"""
+.. note:: Experimental
+
+Carries all task infos of a barrier task.
+
+.. versionadded:: 2.4.0
+"""
+
+def __init__(self, info):
+self.address = info.address
--- End diff --

* should be `info.address`
* better to rename `info` to `jobj` to make it clear this is from Java


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-13 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209830941
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

@HyukjinKwon Could you elaborate your concerns? Is it because resource 
usage or security?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
@shaneknapp Maybe we could scan the test history and move some super stable 
tests to nightly. Apparently, it is not a solution for now. I'm giving another 
try:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
@shaneknapp Is the timeout due to concurrent workload on Jenkins workers? 
If so, shall we reduce the concurrency (more wait in the queue but more robust 
test result)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473946
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
--- End diff --

Leave a TODO here. We do not have requests from Java to Python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473919
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +96,33 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._javaContext is None:
+raise Exception("Not supported to call barrier() inside a 
non-barrier task.")
+else:
+self._javaContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._javaContext is None:
+raise Exception("Not supported to call getTaskInfos() inside a 
non-barrier task.")
+else:
+java_list = self._javaContext.getTaskInfos()
+return [h for h in java_list]
--- End diff --

Create `BarrierTaskInfo` class and wrap it over Java object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473887
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +96,33 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+def barrier(self):
--- End diff --

Create `BarrierTaskContext` that extends `TaskContext` and then move those 
two methods there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460397
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
+  override def apply(key: Int, value: Int): Int = value + 1
+})
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId)
+if (numCheckFailures <= maxFailureNumTasksCheck) {
+  messageScheduler.schedule(
+new Runnable {
+  override def run(): Unit = 
eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
+partitions, callSite, listener, properties))
+},
+timeIntervalNumTasksCheck * 1000,
--- End diff --

minor: how about removing `1000` and changing the time unit to `SECONDS`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460279
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
--- End diff --

minor: Should have an inline comment that mentions the implicit 
conversation from `null` to `0: Int` to handle new keys.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460309
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
+  override def apply(key: Int, value: Int): Int = value + 1
+})
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId)
--- End diff --

minor: this is the return value from `compute`. we don't need `get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-12 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/22001
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209304798
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
+if (numCheckFailures < 
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) {
--- End diff --

Should make `DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES` configurable 
so users can specify unlimited retry if needed. Instead, we might want to fix 
the timeout since it is only relevant to cost.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209294774
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def maxNumConcurrentTasks(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

link to a JIRA


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209276818
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
--- End diff --

+1. Use atomic updates from ConcurrentHashMap. Update the counter and then 
check max failures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209277357
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
+if (numCheckFailures < 
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) {
+  jobIdToNumTasksCheckFailures.put(jobId, numCheckFailures)
+  messageScheduler.schedule(
+new Runnable {
+  override def run(): Unit = 
eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
+partitions, callSite, listener, properties))
+},
+timeIntervalNumTasksCheck * 1000,
+TimeUnit.MILLISECONDS
+  )
+  return
+} else {
+  listener.jobFailed(e)
--- End diff --

do you expect the same job submitted again? if not, we should remove the 
key from the hashmap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209274833
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
--- End diff --

`==` -> `.contains()` in case the error message is nested


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >