[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22790 This shouldn't block 2.4.0 release. Based on the code, it doesn't introduce regression to existing features (just using V1 format and ignore trainingCost and distanceMeasure). Correctness issue occurs only when someone uses a non-default distanceMeasure and then save/load. Could someone help confirm? If current vote passes, we can list it as an known issue in the release notes and fix it in 2.4.1. If other blockers show up, we fix it before RC5. Btw, this PR needs a regression test in order to merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22756 We have to revert this PR in branch-2.4. It is not a blocker and we shouldn't merge it to branch-2.4 this late in this already delayed release. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22675#discussion_r223566032 --- Diff: docs/ml-datasource.md --- @@ -0,0 +1,51 @@ +--- +layout: global +title: Data sources +displayTitle: Data sources +--- + +In this section, we introduce how to use data source in ML to load data. +Beside some general data sources "parquat", "csv", "json", "jdbc", we also provide some specific data source for ML. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## Image data source + +This image data source is used to load libsvm data files from directory. + + + +[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource) +implements Spark SQL data source API for loading image data as DataFrame. +The loaded DataFrame has one StructType column: "image". containing image data stored as image schema. + +{% highlight scala %} +scala> spark.read.format("image").load("data/mllib/images/origin") +res1: org.apache.spark.sql.DataFrame = [image: struct] +{% endhighlight %} + + + +[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html) --- End diff -- Usually it depends on how important the use case is. For example, CSV was created as an external data source and later merged into Spark. See https://issues.apache.org/jira/browse/SPARK-21866?focusedCommentId=16148268=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16148268. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22492 @cloud-fan said above the next version is very likely to be 2.5.0 instead of 3.0. Well the next version number is not fully discussed yet. For that reason, I think we should revert the changes in master as well. After that we should check if the feature itself can be added without introducing breaking changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22492 @WeichenXu123 @cloud-fan I made https://github.com/apache/spark/pull/22618 to revert the change in master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22618: [SPARK-25321][ML] Revert SPARK-14681 to avoid API...
GitHub user mengxr opened a pull request: https://github.com/apache/spark/pull/22618 [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaking change ## What changes were proposed in this pull request? This is the same as #22492 but for master branch. Revert SPARK-14681 to avoid API breaking changes. cc: @WeichenXu123 ## How was this patch tested? Existing unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mengxr/spark SPARK-25321.master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22618 commit 90eb1d7f5895e442a86506e3e7dae382e138b3b0 Author: WeichenXu Date: 2018-09-21T20:05:24Z [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaking change ## What changes were proposed in this pull request? Revert SPARK-14681 to avoid API breaking change. PR [SPARK-14681] will break mleap. ## How was this patch tested? N/A Closes #22492 from WeichenXu123/revert_tree_change. Authored-by: WeichenXu Signed-off-by: Xiangrui Meng --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22492 @WeichenXu123 Please close this PR manually. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22510: [SPARK-25321][ML] Fix local LDA model constructor
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22510 LGTM. Merged into master and branch 2.4. Thanks for checking compatibility with MLeap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22492 LGTM. Merged into branch-2.4. @WeichenXu123 Next time please create dedicated JIRAs for each QA task PR. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22492: [SPARK-25321][ML] Revert SPARK-14681 to avoid API breaki...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22492 We can keep it in master if the next release is 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22449: [SPARK-22666][ML][FOLLOW-UP] Improve testcase to tolerat...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22449 LGTM. Merged into master and branch-2.4. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22449: [SPARK-22666][ML][FOLLOW-UP] Return a correctly formatte...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22449 @WeichenXu123 I think we should fix the test instead of removing "//" from URI if authority is empty. Because both "scheme:/" and "scheme:///" are valid. ~~~scala scala> val u1 = new URI("file:///a/b/c") u1: java.net.URI = file:///a/b/c scala> val u2 = new URI("file:/a/b/c") u2: java.net.URI = file:/a/b/c scala> u1 == u2 res1: Boolean = true ~~~ Shall we update the test? Instead of compare the row record, we compare its fields one by one and convert `origin` to `URI` before comparison? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22449: [SPARK-22666][ML][FOLLOW-UP] Return a correctly f...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22449#discussion_r218498363 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala --- @@ -85,7 +85,9 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister val filteredResult = if (imageSourceOptions.dropInvalid) { resultOpt.toIterator } else { - Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin))) + val basePath = Path.getPathWithoutSchemeAndAuthority(path).toString() --- End diff -- Seems authority got dropped here. ~~~ scala> Path.getPathWithoutSchemeAndAuthority(new Path("s3://dbc/test/ajdj/dfdfd")) res10: org.apache.hadoop.fs.Path = /test/ajdj/dfdfd ~~~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22349: [SPARK-25345][ML] Deprecate public APIs from ImageSchema
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22349 LGTM. Merged into master and branch-2.4. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22349: [SPARK-25345][ML] Deprecate public APIs from ImageSchema
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22349 @WeichenXu123 Could you address the comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22349: [SPARK-25345][ML] Deprecate public APIs from Imag...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22349#discussion_r215840879 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -35,6 +35,8 @@ import org.apache.spark.sql.types._ */ @Experimental @Since("2.3.0") +@deprecated("use `spark.read.format(\"image\").load(path)` and this `ImageSchema` will be " + --- End diff -- There are other methods defined under `ImageSchema` that are not covered by the image data source. So we shall only deprecate `readImages` and leave other public methods as experimental. Same for Python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 Merged into master. Thanks @WeichenXu123 for the implementation and everyone for the review! I created the following JIRAs as follow-ups: * deprecate ImageSchema: https://issues.apache.org/jira/browse/SPARK-25345 * list built-in data sources in doc site: https://issues.apache.org/jira/browse/SPARK-25346 * doc for image data source: https://issues.apache.org/jira/browse/SPARK-25347 * data source for binary files: https://issues.apache.org/jira/browse/SPARK-25348 * support sample pushdown: https://issues.apache.org/jira/browse/SPARK-25349 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 The image data source tests passed but JVM crashed at the end. Triggered another test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 LGTM pending tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22165 I didn't make a full pass over the tests. @jiangxb1987 let me know if you need me to take a pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215326727 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.apache.spark._ +import org.apache.spark.rpc.RpcTimeout + +class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext { + + /** + * Get the current barrierEpoch from barrierCoordinator.states by ContextBarrierId + */ + def getCurrentBarrierEpoch( + stageId: Int, stageAttemptId: Int, barrierCoordinator: BarrierCoordinator): Int = { +val barrierId = ContextBarrierId(stageId, stageAttemptId) +barrierCoordinator.states.get(barrierId).barrierEpoch + } + + test("normal test for single task") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val stageId = 0 +val stageAttemptNumber = 0 +rpcEndpointRef.askSync[Unit]( + message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, taskAttemptId = 0, +barrierEpoch = 0), + timeout = new RpcTimeout(5 seconds, "rpcTimeOut")) +// sleep for waiting barrierEpoch value change +Thread.sleep(500) +assert(getCurrentBarrierEpoch(stageId, stageAttemptNumber, barrierCoordinator) == 1) + } + + test("normal test for multi tasks") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val numTasks = 3 +val stageId = 0 +val stageAttemptNumber = 0 +val rpcTimeOut = new RpcTimeout(5 seconds, "rpcTimeOut") +// sync request from 3 tasks +(0 until numTasks).foreach { taskId => + new Thread(s"task-$taskId-thread") { +setDaemon(true) +override def run(): Unit = { + rpcEndpointRef.askSync[Unit]( +message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId = taskId, + barrierEpoch = 0), +timeout = rpcTimeOut) +} + }.start() +} +// sleep for waiting barrierEpoch value change +Thread.sleep(500) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215326394 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.apache.spark._ +import org.apache.spark.rpc.RpcTimeout + +class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext { + + /** + * Get the current barrierEpoch from barrierCoordinator.states by ContextBarrierId + */ + def getCurrentBarrierEpoch( + stageId: Int, stageAttemptId: Int, barrierCoordinator: BarrierCoordinator): Int = { +val barrierId = ContextBarrierId(stageId, stageAttemptId) +barrierCoordinator.states.get(barrierId).barrierEpoch + } + + test("normal test for single task") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val stageId = 0 +val stageAttemptNumber = 0 +rpcEndpointRef.askSync[Unit]( + message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, taskAttemptId = 0, +barrierEpoch = 0), + timeout = new RpcTimeout(5 seconds, "rpcTimeOut")) +// sleep for waiting barrierEpoch value change +Thread.sleep(500) --- End diff -- Do not use explicit sleep. It basically means adding 0.5 seconds to total test time and flakyness. Use conditional wait, for example: https://github.com/apache/spark/commit/bfb74394a5513134ea1da9fcf4a1783b77dd64e4#diff-a90010f459c27926238d7a4ce5a0aca1R107 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215324595 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator( // Record all active stage attempts that make barrier() call(s), and the corresponding internal // state. - private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + private[spark] val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] --- End diff -- Could you turn the `// ...` comment into ScalaDoc `/** ... */` and mention `Visible for unit testing.` in the doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215322021 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represents the origin of the image. + *If loaded from files, then it is the file path) + * - height: Int (height of the image) + * - width: Int (width of the image) + * - nChannels: Int (number of the image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and + * optionally specify the data source options, for example: + * {{{ + * // Scala + * val df = spark.read.format("image") + * .option("dropImageFailures", true) + * .load("data/mllib/images/partitioned") + * + * // Java + * Dataset df = spark.read().format("image") + * .option("dropImageFailures", true) + * .load("data/mllib/images/partitioned"); + * }}} + * + * IMAGE data source supports the following options: + * - "dropImageFailures": Whether to drop the files that are not valid images from the result. --- End diff -- How about changing `dropImageFailures` to `dropInvalid`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215322673 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageOptions.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +private[image] class ImageOptions( +@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean --- End diff -- Should add ScalaDoc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215320762 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. --- End diff -- "IMAGE" doesn't need to be all uppercase. Just say "loading images". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215321353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represents the origin of the image. + *If loaded from files, then it is the file path) + * - height: Int (height of the image) + * - width: Int (width of the image) + * - nChannels: Int (number of the image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and --- End diff -- ditto on "IMAGE" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215320923 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represents the origin of the image. + *If loaded from files, then it is the file path) --- End diff -- does it always load from files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215323149 --- Diff: mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import java.nio.file.Paths + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.image.ImageSchema._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.{col, substring_index} + +class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { + + // Single column of images named "image" + private lazy val imagePath = "../data/mllib/images/imagesWithPartitions" + + test("image datasource count test") { +val df1 = spark.read.format("image").load(imagePath) +assert(df1.count === 9) + +val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath) +assert(df2.count === 8) + } + + test("image datasource test: read jpg image") { +val df = spark.read.format("image").load(imagePath + "/cls=kittens/date=2018-02/DP153539.jpg") +assert(df.count() === 1) + } + + test("image datasource test: read png image") { +val df = spark.read.format("image").load(imagePath + "/cls=multichannel/date=2018-01/BGRA.png") +assert(df.count() === 1) + } + + test("image datasource test: read non image") { +val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt" +val df = spark.read.format("image").option("dropImageFailures", "true") + .load(filePath) +assert(df.count() === 0) + +val df2 = spark.read.format("image").option("dropImageFailures", "false") + .load(filePath) +assert(df2.count() === 1) +val result = df2.head() +assert(result === invalidImageRow( + Paths.get(filePath).toAbsolutePath().normalize().toUri().toString)) + } + + test("image datasource partition test") { +val result = spark.read.format("image") + .option("dropImageFailures", "true").load(imagePath) + .select(substring_index(col("image.origin"), "/", -1).as("origin"), col("cls"), col("date")) + .collect() + +assert(Set(result: _*) === Set( + Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), + Row("54893.jpg", "kittens", "2018-02"), + Row("DP153539.jpg", "kittens", "2018-02"), + Row("DP802813.jpg", "kittens", "2018-02"), + Row("BGRA.png", "multichannel", "2018-01"), + Row("BGRA_alpha_60.png", "multichannel", "2018-01"), + Row("chr30.4.184.jpg", "multichannel", "2018-02"), + Row("grayscale.jpg", "multichannel", "2018-02") +)) + } + + // Images with the different number of channels + test("readImages pixel values test") { + +val images = spark.read.format("image").option("dropImageFailures", "true") + .load(imagePath + "/cls=multichannel/").collect() + +val firstBytes20Map = images.map { rrow => + val row = rrow.getAs[Row]("image") + val filename = Paths.get(getOrigin(row)).getFileName().toString() + val mode = getMode(row) + val bytes20 = getData(row).slice(0, 20).toList + filename -> Tuple2(mode, bytes20) --- End diff -- nit: It is useful to leave an inline comment here:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215179601 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represent the origin of image. If loaded from file, then it is file path) + * - height: Int (height of image) + * - width: Int (width of image) + * - nChannels: Int (number of image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and + * optionally specify the datasource options, for example: + * {{{ + * // Scala + * val df = spark.read.format("image") + * .option("dropImageFailures", "true") + * .load("data/mllib/images/imagesWithPartitions") + * + * // Java + * Dataset df = spark.read().format("image") + * .option("dropImageFailures", "true") + * .load("data/mllib/images/imagesWithPartitions"); + * }}} + * + * IMAGE data source supports the following options: + * - "dropImageFailures": Whether to drop the files that are not valid images from the result. + * + * @note This IMAGE data source does not support "write". + * + * @note This class is public for documentation purpose. Please don't use this class directly. + * Rather, use the data source API as illustrated above. + */ +class ImageDataSource private() {} --- End diff -- Re: @cloud-fan The Scala package doc doesn't work for Java, which requires a different format. Re: @HyukjinKwon It would be nice to have some doc in the site, though I didn't find the list of built-in data sources in the doc site. I think it is okay to have docs in both locations for IDE users and for people search on the web. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 That doesn't work for Java, if I remember the issue correctly. On Tue, Sep 4, 2018, 10:31 PM Wenchen Fan wrote: > *@cloud-fan* commented on this pull request. > -- > > In > mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala > <https://github.com/apache/spark/pull/22328#discussion_r215140040>: > > > + * > + * // Java > + * Dataset df = spark.read().format("image") > + * .option("dropImageFailures", "true") > + * .load("data/mllib/images/imagesWithPartitions"); > + * }}} > + * > + * IMAGE data source supports the following options: > + * - "dropImageFailures": Whether to drop the files that are not valid images from the result. > + * > + * @note This IMAGE data source does not support "write". > + * > + * @note This class is public for documentation purpose. Please don't use this class directly. > + * Rather, use the data source API as illustrated above. > + */ > +class ImageDataSource private() {} > > Is this a convention? AFAIK in the scala world we usually put document in > package object. > > — > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22328#discussion_r215140040>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAyozMfwWO72FJuY89Yvq1Yl3L-oAOW6ks5uX2GtgaJpZM4WYrQ2> > . > -- Xiangrui Meng Software Engineer Databricks Inc. [image: http://databricks.com] <http://databricks.com/> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 @mhamilton723 I thought about that option too. Loading general binary files is a useful feature but I don't feel it is necessary to pull it into the current scope. No matter whether the image data source has its own implementation or builds on top of the binary data source, I expect users to use ~~~scala spark.read.format("image").load("...") ~~~ to read images instead of something like: ~~~scala spark.read.format("binary").load("...").withColumn("image", decode($"binary")) ~~~ So we can definitely add binary file data source later and swap the implementation without changing the public interface. But we don't need to block this PR getting into 2.4, which will be cut soon. Sounds good? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22240 Merged into master. Thanks for review! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214981991 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.ml.image.ImageSchema +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + + +private[image] class ImageFileFormatOptions( +@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean +} + +private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(ImageSchema.imageSchema) + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { +throw new UnsupportedOperationException( + s"prepareWrite is not supported for image data source") + } + + override def shortName(): String = "image" + + override protected def buildReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { --- End diff -- It won't be addressed in this PR. The best way to support it is to allow data source handle sampling operation. cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 Yes, the ImageSchema implementation are used by the data source, which we cannot remove:) We are only going to mark the public APIs there as deprecated. The goal is to provide users a unified approach to load data into Spark. Users usually find `ImageSchema.readImages` hard to discover. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22328 @imatiach-msft @HyukjinKwon The plan is to mark `ImageSchema` deprecated in 2.4 and remove it in 3.0. So loading images will be the same as loading data from other sources. The gaps are sampling and partition controlling, which might require more testing after 2.4. It would be great if you can help. For sampling, I'm thinking of allowing data source to handle sample operations. @cloud-fan is it feasible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214969542 --- Diff: mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala --- @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext { // Single column of images named "image" - private lazy val imagePath = "../data/mllib/images" + private lazy val imagePath = "../data/mllib/images/images" --- End diff -- "images/images" is confusing. Call it `images/origin` and `images/partitioned` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214967994 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. --- End diff -- Should mention it doesn't support write. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214969782 --- Diff: mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import java.nio.file.Paths + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.image.ImageSchema._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.{col, substring_index} + +class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { + + // Single column of images named "image" + private lazy val imagePath = "../data/mllib/images/imagesWithPartitions" + + test("image datasource count test") { +val df1 = spark.read.format("image").load(imagePath) +assert(df1.count === 9) + +val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath) +assert(df2.count === 8) + } + + test("image datasource test: read jpg image") { +val df = spark.read.format("image").load(imagePath + "/cls=kittens/date=2018-02/DP153539.jpg") +assert(df.count() === 1) + } + + test("image datasource test: read png image") { +val df = spark.read.format("image").load(imagePath + "/cls=multichannel/date=2018-01/BGRA.png") +assert(df.count() === 1) + } + + test("image datasource test: read non image") { +val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt" +val df = spark.read.format("image").option("dropImageFailures", "true") + .load(filePath) +assert(df.count() === 0) + +val df2 = spark.read.format("image").option("dropImageFailures", "false") + .load(filePath) +assert(df2.count() === 1) +val result = df2.head() +assert(result === invalidImageRow( + Paths.get(filePath).toAbsolutePath().normalize().toUri().toString)) + } + + test("image datasource partition test") { +val result = spark.read.format("image") + .option("dropImageFailures", "true").load(imagePath) + .select(substring_index(col("image.origin"), "/", -1).as("origin"), col("cls"), col("date")) + .collect() + +assert(Set(result: _*) === Set( + Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), + Row("54893.jpg", "kittens", "2018-02"), + Row("DP153539.jpg", "kittens", "2018-02"), + Row("DP802813.jpg", "kittens", "2018-02"), + Row("BGRA.png", "multichannel", "2018-01"), + Row("BGRA_alpha_60.png", "multichannel", "2018-01"), + Row("chr30.4.184.jpg", "multichannel", "2018-02"), + Row("grayscale.jpg", "multichannel", "2018-02") +)) + } + + // Images with the different number of channels + test("readImages pixel values test") { + +val images = spark.read.format("image").option("dropImageFailures", "true") + .load(imagePath + "/cls=multichannel/").collect() + +val firstBytes20Map = images.map { rrow => + val row = rrow.getAs[Row]("image") + val filename = Paths.get(getOrigin(row)).getFileName().toString() + val mode = getMode(row) + val bytes20 = getData(row).slice(0, 20).toList + filename -> Tuple2(mode, bytes20) +}.toMap --- End diff -- use Set instead of Map --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214967452 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represent the origin of image. If loaded from file, then it is file path) + * - height: Int (height of image) + * - width: Int (width of image) + * - nChannels: Int (number of image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and + * optionally specify options, for example: + * {{{ + * // Scala + * val df = spark.read.format("image") + * .option("dropImageFailures", "true") + * .load("data/mllib/images/imagesWithPartitions") + * + * // Java + * Dataset df = spark.read().format("image") + * .option("dropImageFailures", "true") + * .load("data/mllib/images/imagesWithPartitions"); + * }}} + * + * IMAGE data source supports the following options: + * - "dropImageFailures": Whether to drop the files that are not valid images from the result. + * + * @note This class is public for documentation purpose. Please don't use this class directly. + * Rather, use the data source API as illustrated above. --- End diff -- I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r214968664 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.ml.image.ImageSchema +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + + +private[image] class ImageFileFormatOptions( +@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean +} + +private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(ImageSchema.imageSchema) + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { +throw new UnsupportedOperationException( + s"prepareWrite is not supported for image data source") --- End diff -- The error message is user-facing and users do not know `prepareWrite`. So just say "Write is not supported" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22271: [SPARK-25268][GraphX]run Parallel Personalized PageRank ...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22271 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22261: [SPARK-25248.1][PYSPARK] update barrier Python API
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22261 Merged into master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22261: [SPARK-25248.1][PYSPARK] update barrier Python API
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22261 There are two PRs from that JIRA, one for Scala APIs and one for Python APIs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22258: [SPARK-25266][CORE] Fix memory leak in Barrier Execution...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22258 Merged into master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22247: [SPARK-25253][PYSPARK] Refactor local connection & auth ...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22247 @squito Thanks for the refactor! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r213537490 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -68,7 +74,7 @@ class BarrierTaskContext( * * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all * possible code branches. Otherwise, you may get the job hanging or a SparkException after - * timeout. Some examples of misuses listed below: + * timeout. Some examples of '''misuses''' listed below: --- End diff -- just saw it, will include it if Jenkins fails:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22261: [SPARK-25248.1][PYSPARK] update barrier Python AP...
GitHub user mengxr opened a pull request: https://github.com/apache/spark/pull/22261 [SPARK-25248.1][PYSPARK] update barrier Python API ## What changes were proposed in this pull request? I made one pass over the Python APIs for barrier mode and updated them to match the Scala doc in #22240 . Major changes: * export the public classes * expand the docs * add doc for BarrierTaskInfo.addresss cc: @jiangxb1987 You can merge this pull request into a Git repository by running: $ git pull https://github.com/mengxr/spark SPARK-25248.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22261.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22261 commit afb50ee1150279f9cb27f92e220a332e029dbc43 Author: Xiangrui Meng Date: 2018-08-29T03:44:54Z update barrier Python API --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22258: [SPARK-25266][CORE] Fix memory leak in Barrier Execution...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22258 LGTM pending test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22240 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r212863571 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala --- @@ -22,15 +22,22 @@ import scala.reflect.ClassTag import org.apache.spark.TaskContext import org.apache.spark.annotation.{Experimental, Since} -/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ -class RDDBarrier[T: ClassTag](rdd: RDD[T]) { +/** + * :: Experimental :: + * Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together. + * [[RDDBarrier]] instances are created by [[RDD.barrier]]. + */ +@Experimental +@Since("2.4.0") +class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) { --- End diff -- also hide the constructor here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r212863543 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -28,4 +28,4 @@ import org.apache.spark.annotation.{Experimental, Since} */ @Experimental @Since("2.4.0") -class BarrierTaskInfo(val address: String) +class BarrierTaskInfo private[spark] (val address: String) --- End diff -- hide the constructor since this is not to be constructed by user --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r212863507 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -21,25 +21,31 @@ import java.util.{Properties, Timer, TimerTask} import scala.concurrent.duration._ import scala.language.postfixOps - -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout} import org.apache.spark.util.{RpcUtils, Utils} -/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ -class BarrierTaskContext( +/** + * :: Experimental :: + * A [[TaskContext]] with extra contextual info and tooling for tasks in a barrier stage. + * Use [[BarrierTaskContext#get]] to obtain the barrier context for a running barrier task. + */ +@Experimental +@Since("2.4.0") +class BarrierTaskContext private[spark] ( --- End diff -- Made the constructor package private to force users get it from `#get()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r212863444 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -68,7 +74,7 @@ class BarrierTaskContext( * * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all * possible code branches. Otherwise, you may get the job hanging or a SparkException after - * timeout. Some examples of misuses listed below: + * timeout. Some examples of '''misuses''' listed below: --- End diff -- use bold font to make sure users don't misread --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r212863381 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -21,25 +21,31 @@ import java.util.{Properties, Timer, TimerTask} import scala.concurrent.duration._ import scala.language.postfixOps - -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout} import org.apache.spark.util.{RpcUtils, Utils} -/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ -class BarrierTaskContext( +/** + * :: Experimental :: + * A [[TaskContext]] with extra contextual info and tooling for tasks in a barrier stage. + * Use [[BarrierTaskContext#get]] to obtain the barrier context for a running barrier task. + */ +@Experimental +@Since("2.4.0") +class BarrierTaskContext private[spark] ( override val stageId: Int, override val stageAttemptNumber: Int, override val partitionId: Int, override val taskAttemptId: Long, override val attemptNumber: Int, -override val taskMemoryManager: TaskMemoryManager, +private[spark] override val taskMemoryManager: TaskMemoryManager, --- End diff -- This is not exposed by `TaskContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [WIP] [SPARK-25248] [CORE] Audit barrier APIs for...
GitHub user mengxr opened a pull request: https://github.com/apache/spark/pull/22240 [WIP] [SPARK-25248] [CORE] Audit barrier APIs for 2.4 ## What changes were proposed in this pull request? I made one pass over barrier APIs added to Spark 2.4 and updates some scopes and docs. TODOs: - [ ] scala doc - [ ] python doc You can merge this pull request into a Git repository by running: $ git pull https://github.com/mengxr/spark SPARK-25248 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22240 commit 19e380ab4f7242f2f2ef48aca81445b0adf0a87d Author: Xiangrui Meng Date: 2018-08-27T04:18:54Z update barrier Scala doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22225: [SPARK-25234][SPARKR] avoid integer overflow in parallel...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/5 Merged into master and branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22225: [SPARK-25234][SPARKR] avoid integer overflow in parallel...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/5 cc @falaki --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22225: [SPARK-25234][SPARKR] avoid integer overflow in p...
GitHub user mengxr opened a pull request: https://github.com/apache/spark/pull/5 [SPARK-25234][SPARKR] avoid integer overflow in parallelize ## What changes were proposed in this pull request? `parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow. ## How was this patch tested? unit test Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mengxr/spark SPARK-25234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5 commit 20f9498384ef5587970c3673c86488404ee89a54 Author: Xiangrui Meng Date: 2018-08-24T18:01:14Z add test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22171: [SPARK-25177][SQL] When dataframe decimal type co...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22171#discussion_r211867603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala --- @@ -197,7 +197,7 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } - override def toString: String = toBigDecimal.toString() + override def toString: String = toBigDecimal.bigDecimal.toPlainString() --- End diff -- I don't recall anything that is relevant:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22085 LGTM. I'm merging this into master. We might need a minor refactor for readability. But it shouldn't block developers testing this new feature. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22158 Merged into master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22112 Then it doesn't meet the requirements for those operations used by MLlib: * sampling * zipWithIndex, zipWithUniqueId * we also use zip, assuming the ordering from the source RDD is preserved, e.g., https://github.com/apache/spark/blob/e50192494d1ae1bdaf845ddd388189998c1a2403/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L403 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22158 LGTM pending Jenkins. Thanks for finding those corner cases! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22112 If "always return the same result with same order when rerun." is the definition of "idempotent", then yes, MLlib RDD closures always returns the same result if the input doesn't change. We use pseudo-randomness to achieve deterministic behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211360719 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +write_with_length("run".encode("utf-8"), sockfile) +sockfile.flush() +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: --- End diff -- Q: Does it handle python worker reuse? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211356245 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): --- End diff -- Should document how this is different from the one in `context.py`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211359959 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -381,6 +465,20 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } } } + + def writeUTF(str: String, dataOut: DataOutputStream) { +val bytes = str.getBytes(StandardCharsets.UTF_8) --- End diff -- nit: `UTF_8` or always use `StandardCharsets` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211358615 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) + val cmdString = readUtf8(sock) + if (cmdString.equals("run")) { --- End diff -- If we do not expect any other command from the socket, we should throw an exception --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211356840 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -76,6 +77,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // TODO: support accumulator in multiple UDF protected val accumulator = funcs.head.funcs.head.accumulator + // Expose a ServerSocket to support method calls via socket from Python side. + private[spark] var serverSocket: Option[ServerSocket] = None + + // Authentication helper used when serving method calls via socket from Python side. + private lazy val authHelper = { +val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) --- End diff -- When `SparkEnv.get` returns null? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211355022 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -20,15 +20,16 @@ package org.apache.spark.api.python import java.io._ import java.net._ import java.nio.charset.StandardCharsets +import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import org.apache.spark._ +import org.apache.spark.{SparkException, _} --- End diff -- `_` should include `SparkException` already --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211359028 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) + val cmdString = readUtf8(sock) + if (cmdString.equals("run")) { +sock.setSoTimeout(0) +barrierAndServe(sock) + } +} catch { + case _: SocketException => --- End diff -- Is the the timeout exception? I don't see any exception that we could silently ignore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211358743 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) --- End diff -- Should add a comment about this timeout. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211357983 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) --- End diff -- minor: useful to add `/* port */` and `/* backlog */` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22137: [MINOR][DOC][SQL] use one line for annotation arg value
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22137 cc: @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22137: [MINOR][DOC][SQL] use one line for annotation arg...
GitHub user mengxr opened a pull request: https://github.com/apache/spark/pull/22137 [MINOR][DOC][SQL] use one line for annotation arg value ## What changes were proposed in this pull request? Put annotation args in one line, or API doc generation will fail. ~~~ [error] /Users/meng/src/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:1559: annotation argument needs to be a constant; found: "_FUNC_(expr) - Returns the character length of string data or number of bytes of ".+("binary data. The length of string data includes the trailing spaces. The length of binary ").+("data includes binary zeros.") [error] "binary data. The length of string data includes the trailing spaces. The length of binary " + [error] ^ [info] No documentation generated with unsuccessful compiler run [error] one error found [error] (catalyst/compile:doc) Scaladoc generation failed [error] Total time: 27 s, completed Aug 17, 2018 3:20:08 PM ~~~ ## How was this patch tested? sbt catalyst/compile:doc passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mengxr/spark minor-doc-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22137 commit e9a93762aeeb219cf9ab600da248a0d1f295d09f Author: Xiangrui Meng Date: 2018-08-17T22:47:04Z fix a minor issue to generate API docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22085 @HyukjinKwon Thanks for the feedback! We will replace the py4j route by a special implementation that can only trigger "context.barrier()" in JVM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 LGTM. Merged into master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 @kiszk Thanks for the note! I reverted the change in DAGSchedulerSuite. Let's try Jenkins again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209846054 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, ctx): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._barrierContext = ctx + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +self._barrierContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call getTaskInfos() before initialize " + +"BarrierTaskContext.") +else: +java_list = self._barrierContext.getTaskInfos() +return [BarrierTaskInfo(h) for h in java_list] + + +class BarrierTaskInfo(object): +""" +.. note:: Experimental + +Carries all task infos of a barrier task. + +.. versionadded:: 2.4.0 +""" + +def __init__(self, info): +self.address = info.address --- End diff -- * should be `info.address()` * better to rename `info` to `jobj` to make it clear this is from Java --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209846015 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, ctx): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._barrierContext = ctx + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +self._barrierContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call getTaskInfos() before initialize " + +"BarrierTaskContext.") +else: +java_list = self._barrierContext.getTaskInfos() +return [BarrierTaskInfo(h) for h in java_list] + + +class BarrierTaskInfo(object): +""" +.. note:: Experimental + +Carries all task infos of a barrier task. + +.. versionadded:: 2.4.0 +""" + +def __init__(self, info): +self.address = info.address --- End diff -- * should be `info.address` * better to rename `info` to `jobj` to make it clear this is from Java --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209830941 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- @HyukjinKwon Could you elaborate your concerns? Is it because resource usage or security? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 @shaneknapp Maybe we could scan the test history and move some super stable tests to nightly. Apparently, it is not a solution for now. I'm giving another try:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 @shaneknapp Is the timeout due to concurrent workload on Jenkins workers? If so, shall we reduce the concurrency (more wait in the queue but more robust test result)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473946 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), --- End diff -- Leave a TODO here. We do not have requests from Java to Python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473919 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +96,33 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._javaContext is None: +raise Exception("Not supported to call barrier() inside a non-barrier task.") +else: +self._javaContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._javaContext is None: +raise Exception("Not supported to call getTaskInfos() inside a non-barrier task.") +else: +java_list = self._javaContext.getTaskInfos() +return [h for h in java_list] --- End diff -- Create `BarrierTaskInfo` class and wrap it over Java object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473887 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +96,33 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + +def barrier(self): --- End diff -- Create `BarrierTaskContext` that extends `TaskContext` and then move those two methods there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460397 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 +}) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) +if (numCheckFailures <= maxFailureNumTasksCheck) { + messageScheduler.schedule( +new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, +partitions, callSite, listener, properties)) +}, +timeIntervalNumTasksCheck * 1000, --- End diff -- minor: how about removing `1000` and changing the time unit to `SECONDS`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460279 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { --- End diff -- minor: Should have an inline comment that mentions the implicit conversation from `null` to `0: Int` to handle new keys. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460309 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 +}) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) --- End diff -- minor: this is the return value from `compute`. we don't need `get`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user mengxr commented on the issue: https://github.com/apache/spark/pull/22001 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209304798 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 +if (numCheckFailures < DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) { --- End diff -- Should make `DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES` configurable so users can specify unlimited retry if needed. Instead, we might want to fix the timeout since it is only relevant to cost. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209294774 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def maxNumConcurrentTasks(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- link to a JIRA --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209276818 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- +1. Use atomic updates from ConcurrentHashMap. Update the counter and then check max failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209277357 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 +if (numCheckFailures < DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) { + jobIdToNumTasksCheckFailures.put(jobId, numCheckFailures) + messageScheduler.schedule( +new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, +partitions, callSite, listener, properties)) +}, +timeIntervalNumTasksCheck * 1000, +TimeUnit.MILLISECONDS + ) + return +} else { + listener.jobFailed(e) --- End diff -- do you expect the same job submitted again? if not, we should remove the key from the hashmap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209274833 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == --- End diff -- `==` -> `.contains()` in case the error message is nested --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org