[GitHub] spark issue #21651: [SPARK-18258] Sink need access to offset representation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21651 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21651: [SPARK-18258] Sink need access to offset represen...
GitHub user ConcurrencyPractitioner opened a pull request: https://github.com/apache/spark/pull/21651 [SPARK-18258] Sink need access to offset representation ## What changes were proposed in this pull request? Currently, sinks only have access to the batchId and the data, not the actual offset representation. The goal of this PR is to expose this representation to sinks via ```addBatch```. ## How was this patch tested? Existing unit tests (needs to be changed to also test for offsetSeqs) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConcurrencyPractitioner/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21651 commit f27be2d323fb287876032b59dd078fc65e9b180d Author: Richard Yu Date: 2018-06-28T00:44:34Z [SPARK-18258] Init Commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21495 > The only change with this PR is that the welcome message will be printed first, and then the Spark URL will be shown latter. It's a minor difference. I think we should create a JIRA to create this change, also evaluate the necessity to fix this issue. If this behavior change will affect some users, maybe we should move the target version to 3.0.0? @dbtsai @felixcheung --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21557: [SPARK-24439][ML][PYTHON]Add distanceMeasure to B...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21557#discussion_r198675445 --- Diff: python/pyspark/ml/clustering.py --- @@ -622,10 +621,10 @@ def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=2 @keyword_only @since("2.0.0") def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, - seed=None, k=4, minDivisibleClusterSize=1.0): + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean"): """ setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=None, k=4, minDivisibleClusterSize=1.0) + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean") Sets params for BisectingKMeans. --- End diff -- I know we already have `setDistanceMeasure` and `getDistanceMeasure` methods from the shared param, but can you also add them here so we can use the `since` decorator? (same as KMeans) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21644 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92392/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21644 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21644 **[Test build #92392 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92392/testReport)** for PR 21644 at commit [`0c6bbbc`](https://github.com/apache/spark/commit/0c6bbbc2d20efe4ac2a954183f91d0eb95f7b757). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668278 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0) { _ + _.remaining()} --- End diff -- space before `}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668704 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkedByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkedByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize +
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668294 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, --- End diff -- 3 spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r198667795 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- Is there a point in even checking the configuration? Why not just set these to `0L` unconditionally? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21650 nit: Also, can you put `[SQL][PYTHON]` in the title? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21650 @icexelloss Can you also show the query plan of the examples in the PR description? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21611 **[Test build #92403 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92403/testReport)** for PR 21611 at commit [`f04efa4`](https://github.com/apache/spark/commit/f04efa484e7b5dfbe709f65845bea58e53611604). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21611 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/528/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21611 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21611: [SPARK-24569][SQL] Aggregator with output type Op...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21611#discussion_r198664388 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala --- @@ -148,6 +148,79 @@ object VeryComplexResultAgg extends Aggregator[Row, String, ComplexAggData] { } +case class OptionBooleanData(name: String, isGood: Option[Boolean]) +case class OptionBooleanIntData(name: String, isGood: Option[(Boolean, Int)]) + +case class OptionBooleanAggregator(colName: String) +extends Aggregator[Row, Option[Boolean], Option[Boolean]] { + + override def zero: Option[Boolean] = None + + override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = { +val index = row.fieldIndex(colName) +val value = if (row.isNullAt(index)) { + Option.empty[Boolean] +} else { + Some(row.getBoolean(index)) +} +merge(buffer, value) + } + + override def merge(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] = { +if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) { + Some(true) +} else if (b1.isDefined) { + b1 +} else { + b2 +} + } + + override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction + + override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder + override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder + + def OptionalBoolEncoder: Encoder[Option[Boolean]] = ExpressionEncoder() +} + +case class OptionBooleanIntAggregator(colName: String) +extends Aggregator[Row, Option[(Boolean, Int)], Option[(Boolean, Int)]] { + + override def zero: Option[(Boolean, Int)] = None + + override def reduce(buffer: Option[(Boolean, Int)], row: Row): Option[(Boolean, Int)] = { +val index = row.fieldIndex(colName) +val value = if (row.isNullAt(index)) { + Option.empty[(Boolean, Int)] +} else { + val nestedRow = row.getStruct(index) + Some((nestedRow.getBoolean(0), nestedRow.getInt(1))) +} +merge(buffer, value) + } + + override def merge( + b1: Option[(Boolean, Int)], + b2: Option[(Boolean, Int)]): Option[(Boolean, Int)] = { +if ((b1.isDefined && b1.get._1) || (b2.isDefined && b2.get._1)) { + val newInt = b1.map(_._2).getOrElse(0) + b2.map(_._2).getOrElse(0) + Some((true, newInt)) +} else if (b1.isDefined) { + b1 +} else { + b2 +} + } + + override def finish(reduction: Option[(Boolean, Int)]): Option[(Boolean, Int)] = reduction + + override def bufferEncoder: Encoder[Option[(Boolean, Int)]] = OptionalBoolIntEncoder + override def outputEncoder: Encoder[Option[(Boolean, Int)]] = OptionalBoolIntEncoder + + def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] = ExpressionEncoder(topLevel = false) --- End diff -- If we want to create the encoders in `Encoders`, maybe we should do it in another PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624] Support mixture of Python UDF and S...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r198664314 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { -e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = { +if (e.evalType != evalType) { + false +} else { + e.children match { +// single PythonUDF child could be chained and evaluated in Python +case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType) +// Python UDF can't be evaluated directly in JVM +case children => !children.exists(hasScalarPythonUDF) + } } } - private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match { -case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf) -case e => e.children.flatMap(collectEvaluatableUDF) + private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match { +case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) => + Seq(udf) +case e => e.children.flatMap(collectEvaluableUDF(_, evalType)) + } + + /** + * Collect evaluable UDFs from the current node. + * + * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node, + * and returns a list of UDFs of the same eval type. + * + * If expressions contain both UDFs eval types, this function will only return Python UDFs. + * + * The caller should call this function multiple times until all evaluable UDFs are collected. + */ + private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = { +val pythonUDFs = + plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF)) + +if (pythonUDFs.isEmpty) { + plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF)) +} else { + pythonUDFs +} } def apply(plan: SparkPlan): SparkPlan = plan transformUp { -// AggregateInPandasExec and FlatMapGroupsInPandas can be evaluated directly in python worker -// Therefore we don't need to extract the UDFs -case plan: FlatMapGroupsInPandasExec => plan --- End diff -- This is no longer needed because this rule will only extract Python UDF and Scalar Pandas UDF and ignore other types of UDFs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21589 **[Test build #92402 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92402/testReport)** for PR 21589 at commit [`1405daf`](https://github.com/apache/spark/commit/1405daf18f9ae907f36c64e426bf65a3a9e567e4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21611: [SPARK-24569][SQL] Aggregator with output type Op...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21611#discussion_r198664098 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala --- @@ -333,4 +406,28 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { df.groupBy($"i").agg(VeryComplexResultAgg.toColumn), Row(1, Row(Row(1, "a"), Row(1, "a"))) :: Row(2, Row(Row(2, "bc"), Row(2, "bc"))) :: Nil) } + + test("SPARK-24569: Aggregator with output type Option[Boolean] creates column of type Row") { +val df = Seq( + OptionBooleanData("bob", Some(true)), + OptionBooleanData("bob", Some(false)), + OptionBooleanData("bob", None)).toDF() +val group = df + .groupBy("name") --- End diff -- A test is added to demonstrate it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21650 **[Test build #92401 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92401/testReport)** for PR 21650 at commit [`be3b99c`](https://github.com/apache/spark/commit/be3b99c951c3df77eace0a6a124f8f9a94ac804c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/527/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13274: Glrm
Github user Tagar commented on the issue: https://github.com/apache/spark/pull/13274 @rezazadeh is there is any plan to incorporate GLRM into core Spark? It seems https://github.com/rezazadeh/spark/tree/glrm/examples/src/main/scala/org/apache/spark/examples/glrm hasn't had updates for several years.. is GLRM for Spark maintained somewhere else? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/21650 This PR takes me a while to get to because I am not very familiar with Catalyst rules. I think in the end the change is relative simple but I would appreciate some more careful review from people that are familiar with Catalyst. cc @BryanCutler @gatorsmile @HyukjinKwon @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21650 **[Test build #92400 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92400/testReport)** for PR 21650 at commit [`6b47b69`](https://github.com/apache/spark/commit/6b47b69305257e9ee9f5135968913a4f92731ef5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13274: Glrm
Github user Tagar commented on a diff in the pull request: https://github.com/apache/spark/pull/13274#discussion_r198663163 --- Diff: examples/src/main/scala/org/apache/spark/examples/glrm/SparkGLRM.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regPenarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import breeze.linalg.{DenseVector => BDV} +import org.apache.spark.SparkContext._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.linalg.distributed.MatrixEntry +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.BitSet + +/** + * Generalized Low Rank Models for Spark + * + * Run these commands from the spark root directory. + * + * Compile with: + * sbt/sbt assembly + * + * Run with: + * ./bin/spark-submit --class org.apache.spark.examples.SparkGLRM \ + * ./examples/target/scala-2.10/spark-examples-1.1.0-SNAPSHOT-hadoop1.0.4.jar \ + * --executor-memory 1G \ + * --driver-memory 1G + */ + +object SparkGLRM { + /* + * GLRM: Bank of loss functions + */ + def lossL2squaredGrad(i: Int, j: Int, prediction: Double, actual: Double): Double = { +prediction - actual + } + + def lossL1Grad(i: Int, j: Int, prediction: Double, actual: Double): Double = { +// a subgradient of L1 +math.signum(prediction - actual) + } + + def mixedLossGrad(i: Int, j: Int, prediction: Double, actual: Double): Double = { +// weird loss function subgradient for demonstration +if (i + j % 2 == 0) lossL1Grad(i, j, prediction, actual) else lossL2squaredGrad(i, j, prediction, actual) + } + + /*** + * GLRM: Bank of prox functions + **/ + // L2 prox + def proxL2(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = { +val arr = v.toArray.map(x => x / (1.0 + stepSize * regPen)) +new BDV[Double](arr) + } + + // L1 prox + def proxL1(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = { +val sr = regPen * stepSize +val arr = v.toArray.map(x => + if (math.abs(x) < sr) 0 + else if (x < -sr) x + sr + else x - sr +) +new BDV[Double](arr) + } + + // Non-negative prox + def proxNonneg(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = { +val arr = v.toArray.map(x => math.max(x, 0)) +new BDV[Double](arr) + } + + /* End of GLRM libarry */ + + + // Helper functions for updating + def computeLossGrads(ms: Broadcast[Array[BDV[Double]]], us: Broadcast[Array[BDV[Double]]], + R: RDD[(Int, Int, Double)], + lossGrad: (Int, Int, Double, Double) => Double) : RDD[(Int, Int, Double)] = { +R.map { case (i, j, rij) => (i, j, lossGrad(i, j, ms.value(i).dot(us.value(j)), rij))} + } + + // Update factors + def update(us: Broadcast[Array[BDV[Double]]], ms: Broadcast[Array[BDV[Double]]], + lossGrads: RDD[(Int, Int, Double)], stepSize: Double, + nnz: Array[Double], + prox: (BDV[Double], Double, Double) => BDV[Double], regPen: Double) + : Array[BDV[Double]] = { +val rank = ms.value(0).length +val ret = Array.fill(ms.value.size)(BDV.zeros[Double](rank)) + +val retu = lossGrads.map { case (i, j, lossij) => (i, us.value(j) * lossij) } // vector/scalar multiply +.reduceByKey(_ + _).collect() // vector addition through breeze + +for (entry <- retu) { + val idx = entry._1 + val g = entry._2 + val alpha = (stepSize / (nnz(idx) + 1)) + + ret(idx) = prox(ms.value(idx) - g * alpha, alpha, regPen) +} + +ret + } + + def fitGLRM(R:
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/526/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21650 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21589 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21589 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92393/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21600: [Spark-24553][WEB-UI] http 302 fixes for href red...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21600 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21589 **[Test build #92393 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92393/testReport)** for PR 21589 at commit [`c280b6c`](https://github.com/apache/spark/commit/c280b6c6471f2699fa971a48bab958a2e0b40f5a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21650: [SPARK-24624] Support mixture of Python UDF and S...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/21650 [SPARK-24624] Support mixture of Python UDF and Scalar Pandas UDF ## What changes were proposed in this pull request? This PR add supports for using mixed Python UDF and Scalar Pandas UDF, in the following two cases: (1) ``` f1 = udf(lambda x: x + 1, 'int') f2 = pandas_udf(lambda x: x + 2, 'int') df = ... df = df.withColumn('foo', f1(df['v'])) df = df.withColumn('bar', f2(df['v'])) ``` (2) ``` f1 = udf(lambda x: x + 1, 'int') f2 = pandas_udf(lambda x: x + 2, 'int') df = ... df = df.withColumn('foo', f2(f1(df['v']))) ``` ## How was this patch tested? New tests are added to BatchEvalPythonExecSuite and ScalarPandasUDFTests You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-24624-mix-udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21650 commit 48ae822bcdf6df40b181f86379d275d602c580c9 Author: Li Jin Date: 2018-06-22T18:35:34Z wip commit 68e665ec981c1a7cae46398bc2ea8a4880e95331 Author: Li Jin Date: 2018-06-27T22:31:25Z Test passes commit 6b47b69305257e9ee9f5135968913a4f92731ef5 Author: Li Jin Date: 2018-06-27T22:34:28Z Remove white spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21613: [SPARK-24629][SQL]thrift server memory leaks when Beelin...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21613 cc @liufengdb --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21600: [Spark-24553][WEB-UI] http 302 fixes for href redirect
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21600 Thanks! Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/Pa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21389 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r198660818 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +182,111 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeMessageBatch(new ReadChannel(Channels.newChannel(in)), allocator) + .asInstanceOf[ArrowRecordBatch] // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) +try { + // Create array so that we can safely close the file + val batches = getBatchesFromStream(fileStream.getChannel).toArray + // Parallelize the record batches to create an RDD + JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, batches.length)) +} finally { + fileStream.close() +} + } + + /** + * Read an Arrow stream input and return an iterator of serialized ArrowRecordBatches. + */ + private[sql] def getBatchesFromStream(in: SeekableByteChannel): Iterator[Array[Byte]] = { + +// TODO: this could be moved to Arrow +def readMessageLength(in: ReadChannel): Int = { + val buffer = ByteBuffer.allocate(4) + if (in.readFully(buffer) != 4) { +return 0 + } + MessageSerializer.bytesToInt(buffer.array()) +} + +// TODO: this could be moved to Arrow +def loadMessage(in: ReadChannel, messageLength: Int, buffer: ByteBuffer): Message = { + if (in.readFully(buffer) != messageLength) { +throw new java.io.IOException( + "Unexpected end of stream trying to read message.") + } + buffer.rewind() + Message.getRootAsMessage(buffer) +} + + +// Create an iterator to get each serialized ArrowRecordBatch from a stream +new Iterator[Array[Byte]] { + val inputChannel = new ReadChannel(in) + var batch: Array[Byte] = readNextBatch() + + override def hasNext: Boolean = batch != null + + override def next(): Array[Byte] = { +val prevBatch = batch +batch = readNextBatch() +prevBatch + } + + def readNextBatch(): Array[Byte] = { +val messageLength = readMessageLength(inputChannel) +if (messageLength == 0) { + return null +} + +val buffer = ByteBuffer.allocate(messageLength) +val msg = loadMessage(inputChannel, messageLength, buffer) --- End diff -- The only issue with this is that it is aware that a message is preceded by the message length and that a length of zero indicates no more messages. Ideally, this logic would be abstracted to Arrow...
[GitHub] spark issue #21542: [SPARK-24529][Build][test-maven] Add spotbugs into maven...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21542 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92391/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21542: [SPARK-24529][Build][test-maven] Add spotbugs into maven...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21542 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21542: [SPARK-24529][Build][test-maven] Add spotbugs into maven...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21542 **[Test build #92391 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92391/testReport)** for PR 21542 at commit [`1330fa6`](https://github.com/apache/spark/commit/1330fa6e3ad5ffd38e2f2c10c1561951a6ef221f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21389 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r198659423 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = --- End diff -- If this needs to be shared it should go in a separate feature step that is applied in both the driver builder and the executor builder. There's probably a better way to share the resource requests / limits code here, probably by extracting the shared parts into a separate step that is parameterized by different values from the Spark configuration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198652991 --- Diff: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala --- @@ -43,6 +44,17 @@ trait BlockDataManager { level: StorageLevel, classTag: ClassTag[_]): Boolean + /** + * Put the given block that will be received as a stream. + * + * When this method is called, the data itself is not available -- it needs to be handled within + * the callbacks of streamData. --- End diff -- Need to update comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198657121 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1341,12 +1390,16 @@ private[spark] class BlockManager( try { val onePeerStartTime = System.nanoTime logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") +// This thread keeps a lock on the block, so we do not want the netty thread to unlock +// block when it finishes sending the message. +val mb = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false, --- End diff -- s/mb/buffer Confusing in a place that deals with sizes all over. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198652931 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Arrays; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + +/** + * A request to Upload a block, which the destintation should receive as a stream. + * + * The actual block data is not contained here. It is in the streamData in the RpcHandler.receive() --- End diff -- Need to update to match API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198656665 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +701,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though --- End diff -- it's --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198653427 --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala --- @@ -73,10 +73,34 @@ class NettyBlockRpcServer( } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) +logInfo(s"Receiving replicated block $blockId with level ${level} " + + s"from ${client.getSocketAddress}") blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) } } + override def receiveStream( + client: TransportClient, + messageHeader: ByteBuffer, + responseContext: RpcResponseCallback): StreamCallbackWithID = { +val message = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader) +message match { + case uploadBlockStream: UploadBlockStream => + val (level: StorageLevel, classTag: ClassTag[_]) = { --- End diff -- Indentation is off here. Using `.asInstanceOf[UploadBlockStream]` would achieve the same goal here with less indentation, just with a different exception... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r198656910 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -723,7 +770,9 @@ private[spark] class BlockManager( } if (data != null) { -return Some(new ChunkedByteBuffer(data)) +val chunkSize = + conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt --- End diff -- Want to turn this into a config constant? I'm seeing it in a bunch of places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21440: [SPARK-24307][CORE] Support reading remote cached partit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21440 **[Test build #92399 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92399/testReport)** for PR 21440 at commit [`6c57e4d`](https://github.com/apache/spark/commit/6c57e4d35d76d5f2b618a24bd56d83899eea567e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21440: [SPARK-24307][CORE] Support reading remote cached partit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21440 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21440: [SPARK-24307][CORE] Support reading remote cached partit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21440 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/525/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198655159 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalPos = currentChunk.position() --- End diff -- sorry bunch of leftover bits from earlier debugging. all cleaned up now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21389 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21389 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92390/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 ### Memory Improvements **toPandas()** The most significant improvement is reduction of the upper bound space complexity in the JVM driver. Before, the entire dataset was collected in the JVM first before sending it to Python. With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition. Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches. The schema is now only send from driver JVM to Python. Before, multiple Arrow file formats were used that each contained the schema. This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded. I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available. Using these settings on a standalone cluster: ``` spark.driver.memory 1g spark.executor.memory5g spark.sql.execution.arrow.enabled true spark.sql.execution.arrow.fallback.enabled false spark.sql.execution.arrow.maxRecordsPerBatch 0 spark.driver.maxResultSize 2g ``` Test code: ```python from pyspark.sql.functions import rand df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()) df.toPandas() ``` This makes total data size of 33554432×8×4 = 1073741824 With the current master, it fails with OOM but passes using this PR. **createDataFrame()** No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above. The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21389 **[Test build #92390 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92390/testReport)** for PR 21389 at commit [`1cfc7b0`](https://github.com/apache/spark/commit/1cfc7b02089401eca2f17db55b113e6620f398be). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21589 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21589 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92389/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21589 **[Test build #92389 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92389/testReport)** for PR 21589 at commit [`2e6dce4`](https://github.com/apache/spark/commit/2e6dce489ea2e2cae36732d6a834302b2076bcb2). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r198653002 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = --- End diff -- Sometimes you need it. For example, to reduce data across multiple executors, you would ideally use ring all-reduce among your executors, but you cannot really do that right now given that executors are scheduled independently. The best you can do right now is to gather all of your data to the driver and then do the reduction there. You can learn more at the SPIP for project hydrogen/barrier execution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21075: [SPARK-23988][MESOS] Improve handling of appResource in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21075 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198651582 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { --- End diff -- this version isn't used till the other PR. I can pull it out there the other version of `map` is used in this pr from `BlockManager.getRemoteBytes() -> ChunkedByteBuffer.fromManagedBuffer() -> ChunkedByteBuffer.map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21636: [SPARK-24533] Typesafe rebranded to lightbend. Ch...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21636 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21635 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92397/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21635 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21635 **[Test build #92397 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92397/testReport)** for PR 21635 at commit [`b0ee4ec`](https://github.com/apache/spark/commit/b0ee4ec158e96deba98c0e1e3b9df5039f616ddb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20944: [SPARK-23831][SQL] Add org.apache.derby to IsolatedClien...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/20944 So I had this come up while I was testing Spark 2.1.3 RC2 on a machine with an existing YARN cluster with `spark-testing-base`. Haven't had the chance to dig into it fully. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21636: [SPARK-24533] Typesafe rebranded to lightbend. Changing ...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/21636 Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21644: [SPARK-24660][SHS] Show correct error pages when ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21644 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 ### Performance Tests - createDataFrame Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8 measured wall clock time to execute `createDataFrame()` and get the first record. Took the average best time of 5 runs/5 loops each. Test code ```python def run(): pdf = pd.DataFrame(np.random.rand(1000, 10)) spark.createDataFrame(pdf).first() for i in range(6): start = time.time() run() elapsed = time.time() - start gc.collect() print("Run %d: %f" % (i, elapsed)) ``` | Current Master | This PR | |-|| 6.234608 | 5.665641 6.32144 | 5.3475 6.527859 | 5.370803 6.95089 | 5.479151 6.235046 | 5.529167 | Avg Master | Avg This PR ---|-- 6.4539686 | 5.4784524 Speedup of 1.178064192 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/21644 Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21451 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/524/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 ### Performance Tests - toPandas Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8 measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each. Test code ```python df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand()) for i in range(5): start = time.time() _ = df.toPandas() elapsed = time.time() - start ``` | Current Master | This PR | |-|| 5.803557 | 4.342533 5.409119 | 4.399408 5.493509 | 4.468471 5.433107 | 4.36524 5.488757 | 4.373791 | Avg Master | Avg This PR ---|-- 5.5256098 | 4.3898886 Speedup of 1.258712989 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198630017 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { --- End diff -- Is this used anywhere? Couldn't find a reference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198641717 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalPos = currentChunk.position() --- End diff -- Unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198632416 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { --- End diff -- Extend `AbstractFileRegion`? Do the fields need to be public? You don't seem to need `Logging`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198636259 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} --- End diff -- Use `foldLeft(0) { blah }` + avoid the intermediate `val`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21635 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21635 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92396/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21635 **[Test build #92396 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92396/testReport)** for PR 21635 at commit [`a8f3146`](https://github.com/apache/spark/commit/a8f314694834933c1e47cdf523ab87c57f3a821b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21635 **[Test build #92397 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92397/testReport)** for PR 21635 at commit [`b0ee4ec`](https://github.com/apache/spark/commit/b0ee4ec158e96deba98c0e1e3b9df5039f616ddb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21451 **[Test build #92398 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92398/testReport)** for PR 21451 at commit [`1cc0f3f`](https://github.com/apache/spark/commit/1cc0f3ffa2b563c54771a38c4dd9f2598b29f0db). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21644 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92386/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21644 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21644: [SPARK-24660][SHS] Show correct error pages when downloa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21644 **[Test build #92386 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92386/testReport)** for PR 21644 at commit [`a9baa7c`](https://github.com/apache/spark/commit/a9baa7cc3bd894cf06f98a5b58a4c2754b11aef1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21635#discussion_r198641094 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) extends Source { + + override val sourceName: String = "yarn" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] { +override def getValue: Int = yarnAllocator.getNumExecutorsFailed + }) + + metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] { +override def getValue: Int = yarnAllocator.getNumExecutorsRunning + }) + + metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] { +override def getValue: Int = yarnAllocator.getNumReleasedContainers + }) + + metricRegistry.register(MetricRegistry.name("numPendingLossReasonRequests"), new Gauge[Int] { +override def getValue: Int = yarnAllocator.getNumPendingLossReasonRequests + }) + + metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] { +override def getValue: Int = yarnAllocator.numLocalityAwareTasks + }) + +} --- End diff -- The getPendingAllocate seams to me quite cheap as it just uses local maps (and tables) to calculate a list of ContainerRequests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21560: [SPARK-24386][SS] coalesce(1) aggregates in continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21560 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92387/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21560: [SPARK-24386][SS] coalesce(1) aggregates in continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21560 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21560: [SPARK-24386][SS] coalesce(1) aggregates in continuous p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21560 **[Test build #92387 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92387/testReport)** for PR 21560 at commit [`f77b12b`](https://github.com/apache/spark/commit/f77b12ba92a868274ecdfea331786addb2d9ca83). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21635 **[Test build #92396 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92396/testReport)** for PR 21635 at commit [`a8f3146`](https://github.com/apache/spark/commit/a8f314694834933c1e47cdf523ab87c57f3a821b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 _UPDATE_ I changed `toPandas` to write out of order partitions to python as they come in, followed by a list of indices to represent the correct batch order. In python, the batches are then put in the correct order to make the Arrow Table / Pandas DataFrame. This is slightly more complicated than before because we are sending extra info to Python, but it significantly reduces the upper-bound space complexity in the driver JVM from all data to the size of the largest partition. It also seems to be a little faster, so I re-ran the performance tests, which I'll post now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21546 **[Test build #92395 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92395/testReport)** for PR 21546 at commit [`fe3319b`](https://github.com/apache/spark/commit/fe3319bd7ab290e30f6075a81acd0b17818ad546). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/523/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21649: [SPARK-23648][R][SQL]Adds more types for hint in SparkR
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21649 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92394/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21649: [SPARK-23648][R][SQL]Adds more types for hint in SparkR
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21649 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21649: [SPARK-23648][R][SQL]Adds more types for hint in SparkR
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21649 **[Test build #92394 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92394/testReport)** for PR 21649 at commit [`009f82c`](https://github.com/apache/spark/commit/009f82c984c559b68169bb1944d2921b0c218e9e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21635: [SPARK-24594][YARN] Introducing metrics for YARN
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21635#discussion_r198630307 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -309,6 +312,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) +} finally { + metricsSystem.report() --- End diff -- Yes, better and more elegant to store metricSystem in an Option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org