[GitHub] spark pull request #20830: [SPARK-23691][PYTHON] Use sql_conf util in PySpar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20830#discussion_r174857693 --- Diff: python/pyspark/sql/tests.py --- @@ -3551,12 +3544,12 @@ def test_null_conversion(self): self.assertTrue(all([c == 1 for c in null_counts])) def _toPandas_arrow_toggle(self, df): -self.spark.conf.set("spark.sql.execution.arrow.enabled", "false") -try: +with self.sql_conf({"spark.sql.execution.arrow.enabled": False}): pdf = df.toPandas() -finally: -self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") -pdf_arrow = df.toPandas() + +with self.sql_conf({"spark.sql.execution.arrow.enabled": True}): --- End diff -- We can omit this when we use the default value or set the value in setup method, but I'm okay if we want to show the value explicitly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20828 **[Test build #88271 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88271/testReport)** for PR 20828 at commit [`7c8d30e`](https://github.com/apache/spark/commit/7c8d30e00ca7497cd4ed621fc0f5dacc807efc04). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/20835 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20834: [SPARK-23695][PYTHON] Fix the error message for K...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20834 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20824: [SPARK-23683][SQL][FOLLOW-UP] FileCommitProtocol....
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/20824#discussion_r174829859 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -145,15 +146,23 @@ object FileCommitProtocol { jobId: String, outputPath: String, dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = { + +logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" + + s" dynamic=$dynamicPartitionOverwrite") val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] // First try the constructor with arguments (jobId: String, outputPath: String, // dynamicPartitionOverwrite: Boolean). // If that doesn't exist, try the one with (jobId: string, outputPath: String). try { val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) + logDebug("Using (String, String, Boolean) constructor") ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean]) } catch { case _: NoSuchMethodException => +logDebug("Falling back to (String, String) constructor") +require(!dynamicPartitionOverwrite, + "Dynamic Partition Overwrite is enabled but" + +s" the committer ${className} does not have the appropriate constructor") --- End diff -- Problem is that the dynamic partition logic in `InsertIntoHadoopFsRelationCommand` assumes that rename() is a fast reliable operation you can do with any implementation of the FileCommitProtocol, sets itself up for it when enabled, then instantiates the inner committer, and carries on with the dynamic partitioning, irrespective of whether or not. rename() doesn't always work like that, breaking the rest of the algorithm. If the committer doesn't have that 3-arg constructor, you can't be confident that you can do that. To silently log and continue is to run the risk that the underlying committers commit algorithm isn't compatible with the algorithm. A fail-fast ensures that when the outcome is going to be unknown, you aren' t left trying to work out what's happened. Regarding your example, yes, it's in trouble. Problem is: how to differentiate that from subclasses which don't know anything at all about the new feature. you can't even look for an interface on the newly created object if the base class implements it; you are left with some dynamic probe of the instance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174871585 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -328,6 +328,32 @@ trait Nondeterministic extends Expression { protected def evalInternal(input: InternalRow): Any } +/** + * An expression that contains mutable state. A stateful expression is always non-deterministic + * because the results it produces during evaluation are not only dependent on the given input + * but also on its internal state. + * + * The state of the expressions is generally not exposed in the parameter list and this makes + * comparing stateful expressions problematic because similar stateful expressions (with the same + * parameter list) but with different internal state will be considered equal. This is especially + * problematic during tree transformations. In order to counter this the `fastEquals` method for + * stateful expressions only returns `true` for the same reference. + * + * A stateful expression should never be evaluated multiple times for a single row. This should + * only be a problem for interpreted execution. This can be prevented by creating fresh copies + * of the stateful expression before execution, these can be made using the `freshCopy` function. + */ +trait Stateful extends Nondeterministic { + /** + * Return a fresh uninitialized copy of the stateful expression. + */ + def freshCopy(): Stateful = this --- End diff -- I think it's better to not provide this default implementation, to avoid mistakes in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174873582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark issue #20834: [SPARK-23695][PYTHON] Fix the error message for Kinesis ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20834 Thanks! merging to master/2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r174831270 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant --- End diff -- Hey @squito thanks again for the comments, sorry it took me a minute to get back to you. So, what you describe is basically what I'm doing. I start replicating the latest block and work back to the earliest blocks. If there are any problems I give up. I use the rdd block id as a proxy for the order in which blocks were created. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20836: SPARK-23685 : Fix for the Spark Structured Streaming Kaf...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20836 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20836: SPARK-23685 : Fix for the Spark Structured Streaming Kaf...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20836 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19381: [SPARK-10884][ML] Support prediction on single in...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19381#discussion_r174862550 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala --- @@ -109,4 +110,14 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => testTransformerOnDF(dataframe, transformer, firstResultCol, otherResultCols: _*)(globalCheckFunction) } + + def testPredictorModelSinglePrediction(model: PredictionModel[Vector, _], --- End diff -- Thanks for the updates! Just this comment: Can we please call this method testPredictionModelSinglePrediction to match the name of the class it is testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174868215 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path --- End diff -- yeah i updated the comment, the above comment is more precise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20611 **[Test build #88272 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88272/testReport)** for PR 20611 at commit [`8341465`](https://github.com/apache/spark/commit/8341465594f9316f9f52678fd70faa7f910edfcc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20830: [SPARK-23691][PYTHON] Use sql_conf util in PySpar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/20830#discussion_r174868549 --- Diff: python/pyspark/sql/tests.py --- @@ -2461,17 +2461,13 @@ def test_join_without_on(self): df1 = self.spark.range(1).toDF("a") df2 = self.spark.range(1).toDF("b") -try: -self.spark.conf.set("spark.sql.crossJoin.enabled", "false") +with self.sql_conf({"spark.sql.crossJoin.enabled": False}): self.assertRaises(AnalysisException, lambda: df1.join(df2, how="inner").collect()) -self.spark.conf.set("spark.sql.crossJoin.enabled", "true") +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): --- End diff -- So the `sql_conf` context will change this back to be unset right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/20835 cc @kiszk @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20750 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1536/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20750 **[Test build #88270 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88270/testReport)** for PR 20750 at commit [`7e15f30`](https://github.com/apache/spark/commit/7e15f3061b5fbd48344ce36e773766a83b2bf410). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20750 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20836: SPARK-23685 : Fix for the Spark Structured Stream...
GitHub user sirishaSindri opened a pull request: https://github.com/apache/spark/pull/20836 SPARK-23685 : Fix for the Spark Structured Streaming Kafka 0.10 Consu⦠â¦mer Can't Handle Non-consecutive Offsets ## What changes were proposed in this pull request? In the fetchData , Instead of throwing an exception on failOnDataLoss, I am saying return the record if its offset falls in the user requested offset range ## How this patch was tested : manually tested, added a unit test and ran in a real deployment You can merge this pull request into a Git repository by running: $ git pull https://github.com/sirishaSindri/spark SPARK-23685 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20836.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20836 commit 5ccfed840f9cf9cd1c28a309b934e1285332d04d Author: z001k5cDate: 2018-03-15T15:53:14Z SPARK-23685 : Fix for the Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets commit 7e08cd9062683c062b7b0408ffe40ff726249909 Author: z001k5c Date: 2018-03-15T17:06:06Z SPARK-23685 : Fix for the Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange when cac...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20831 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange when cac...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20831 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88266/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/20777#discussion_r174864748 --- Diff: python/pyspark/ml/feature.py --- @@ -465,26 +473,26 @@ class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, " Default False", typeConverter=TypeConverters.toBoolean) @keyword_only -def __init__(self, minTF=1.0, minDF=1.0, vocabSize=1 << 18, binary=False, inputCol=None, - outputCol=None): +def __init__(self, minTF=1.0, minDF=1.0, maxDF=sys.maxsize, vocabSize=1 << 18, binary=False, --- End diff -- I think it's best just to hardcode the value like you did before, `sys.maxsize` can be 32bit on some systems https://docs.python.org/3/library/sys.html#sys.maxsize --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange when cac...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20831 **[Test build #88266 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88266/testReport)** for PR 20831 at commit [`fb0f949`](https://github.com/apache/spark/commit/fb0f949ec5e144ad38ddc780ae1bb228c59bce55). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/20777#discussion_r174863155 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -70,19 +70,21 @@ private[feature] trait CountVectorizerParams extends Params with HasInputCol wit def getMinDF: Double = $(minDF) /** - * Specifies the maximum number of different documents a term must appear in to be included - * in the vocabulary. - * If this is an integer greater than or equal to 1, this specifies the number of documents - * the term must appear in; if this is a double in [0,1), then this specifies the fraction of - * documents. + * Specifies the maximum number of different documents a term could appear in to be included + * in the vocabulary. A term that appears more than the threshold will be ignored. If this is an + * integer greater than or equal to 1, this specifies the maximum number of documents the term + * could appear in; if this is a double in [0,1), then this specifies the maximum fraction of + * documents the term could appear in. --- End diff -- @srowen do these doc changes look ok to you? It was a little confusing before saying that the term "must appear" when it's a max value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20834: [SPARK-23695][PYTHON] Fix the error message for Kinesis ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20834 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1535/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20659 **[Test build #88269 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88269/testReport)** for PR 20659 at commit [`e041704`](https://github.com/apache/spark/commit/e041704c8898234299b8af1d85b09c1acc2532ab). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/20835 @kiszk do we have a more reasonable way to test this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174890611 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark issue #20824: [SPARK-23683][SQL][FOLLOW-UP] FileCommitProtocol.instant...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20824 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88267/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20835 **[Test build #88268 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88268/testReport)** for PR 20835 at commit [`32df7d6`](https://github.com/apache/spark/commit/32df7d6d7b1c1d17460fc6cdb8b17adee8c765fd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/20835 Merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20835 @hvanhovell @wangyum Thank you very much. LGTM for now. I thought the same immediate fix. Is it better to create a standalone test case like [BufferHolderSparkSubmitSuite](https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala)? Or, is it OK with leaving this test case as `ignore`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20837 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88274/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r174921341 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( --- End diff -- While it would be nice to have this be a trait instead of a class, I am worried about breaking public APIs. However, one could argue that this isn't a public API since the constructor is private (though the class is public). I'll CC people on https://issues.apache.org/jira/browse/SPARK-7131 where these classes were made public to get feedback. Let's give a couple of days for feedback before proceeding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20579 **[Test build #88273 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88273/testReport)** for PR 20579 at commit [`ad15411`](https://github.com/apache/spark/commit/ad154115e520b82eec0b252fa19e66abdc1da832). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20611 **[Test build #88272 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88272/testReport)** for PR 20611 at commit [`8341465`](https://github.com/apache/spark/commit/8341465594f9316f9f52678fd70faa7f910edfcc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20579 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20579 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1537/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20659 **[Test build #88269 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88269/testReport)** for PR 20659 at commit [`e041704`](https://github.com/apache/spark/commit/e041704c8898234299b8af1d85b09c1acc2532ab). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88269/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20579 **[Test build #88273 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88273/testReport)** for PR 20579 at commit [`ad15411`](https://github.com/apache/spark/commit/ad154115e520b82eec0b252fa19e66abdc1da832). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20837: [SPARK-23686][ML][WIP] Better instrumentation
GitHub user MrBago opened a pull request: https://github.com/apache/spark/pull/20837 [SPARK-23686][ML][WIP] Better instrumentation ## What changes were proposed in this pull request? This PR is meant to show how we could better utilize the Instrumentation class in spark.ml. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MrBago/spark better-instrumentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20837.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20837 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20835 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88268/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20835 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20669: [SPARK-22839][K8S] Remove the use of init-contain...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/20669#discussion_r174902563 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -110,30 +109,29 @@ private[spark] class Client( for (nextStep <- submissionSteps) { currentDriverSpec = nextStep.configureDriver(currentDriverSpec) } - -val resolvedDriverJavaOpts = currentDriverSpec - .driverSparkConf - // Remove this as the options are instead extracted and set individually below using - // environment variables with prefix SPARK_JAVA_OPT_. - .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) - .getAll - .map { -case (confKey, confValue) => s"-D$confKey=$confValue" - } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) -val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { - case (option, index) => -new EnvVarBuilder() - .withName(s"$ENV_JAVA_OPT_PREFIX$index") - .withValue(option) - .build() -} - +val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map" +val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf) +// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the +// Spark command builder to pickup on the Java Options present in the ConfigMap val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer) - .addAllToEnv(driverJavaOptsEnvs.asJava) + .addNewEnv() +.withName(SPARK_CONF_DIR_ENV) +.withValue(SPARK_CONF_PATH) --- End diff -- Do the executors require a SPARK_CONF_DIR directory to be defined as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20837 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20837 **[Test build #88274 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88274/testReport)** for PR 20837 at commit [`172bf27`](https://github.com/apache/spark/commit/172bf27e3eaee4e080902d3e8e94a8df1cf5c694). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` instr.logWarning(s\"All labels belong to a single class and fitIntercept=false. It's a \" +` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20611 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20828 **[Test build #88271 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88271/testReport)** for PR 20828 at commit [`7c8d30e`](https://github.com/apache/spark/commit/7c8d30e00ca7497cd4ed621fc0f5dacc807efc04). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20611 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88272/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174892448 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/20579 @cloud-fan @rdblue Thank you for clarification. I am sorry, i hadn't seen your comments before i pushed the last change which targets only parquet. I will adjust the fix to target all formats in a future commit. Thanks again !! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174892728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark issue #20824: [SPARK-23683][SQL][FOLLOW-UP] FileCommitProtocol.instant...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20824 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20824: [SPARK-23683][SQL][FOLLOW-UP] FileCommitProtocol.instant...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20824 **[Test build #88267 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88267/testReport)** for PR 20824 at commit [`64602ae`](https://github.com/apache/spark/commit/64602ae97a5318c674d09238f36ff1fec073c97e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/20659 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20750 **[Test build #88270 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88270/testReport)** for PR 20750 at commit [`7e15f30`](https://github.com/apache/spark/commit/7e15f3061b5fbd48344ce36e773766a83b2bf410). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20837 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20837 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1538/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20837 **[Test build #88274 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88274/testReport)** for PR 20837 at commit [`172bf27`](https://github.com/apache/spark/commit/172bf27e3eaee4e080902d3e8e94a8df1cf5c694). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20777#discussion_r174911085 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -70,19 +70,21 @@ private[feature] trait CountVectorizerParams extends Params with HasInputCol wit def getMinDF: Double = $(minDF) /** - * Specifies the maximum number of different documents a term must appear in to be included - * in the vocabulary. - * If this is an integer greater than or equal to 1, this specifies the number of documents - * the term must appear in; if this is a double in [0,1), then this specifies the fraction of - * documents. + * Specifies the maximum number of different documents a term could appear in to be included + * in the vocabulary. A term that appears more than the threshold will be ignored. If this is an + * integer greater than or equal to 1, this specifies the maximum number of documents the term + * could appear in; if this is a double in [0,1), then this specifies the maximum fraction of + * documents the term could appear in. --- End diff -- Agree, your wording is clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20659 **[Test build #88275 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88275/testReport)** for PR 20659 at commit [`e041704`](https://github.com/apache/spark/commit/e041704c8898234299b8af1d85b09c1acc2532ab). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1539/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20828 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20828 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88271/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to ac...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20835 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20669: [SPARK-22839][K8S] Remove the use of init-contain...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20669#discussion_r174902939 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -110,30 +109,29 @@ private[spark] class Client( for (nextStep <- submissionSteps) { currentDriverSpec = nextStep.configureDriver(currentDriverSpec) } - -val resolvedDriverJavaOpts = currentDriverSpec - .driverSparkConf - // Remove this as the options are instead extracted and set individually below using - // environment variables with prefix SPARK_JAVA_OPT_. - .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) - .getAll - .map { -case (confKey, confValue) => s"-D$confKey=$confValue" - } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) -val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { - case (option, index) => -new EnvVarBuilder() - .withName(s"$ENV_JAVA_OPT_PREFIX$index") - .withValue(option) - .build() -} - +val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map" +val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf) +// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the +// Spark command builder to pickup on the Java Options present in the ConfigMap val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer) - .addAllToEnv(driverJavaOptsEnvs.asJava) + .addNewEnv() +.withName(SPARK_CONF_DIR_ENV) +.withValue(SPARK_CONF_PATH) --- End diff -- Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20659: [DO-NOT-MERGE] Try to update Hive to 2.3.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20659 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20750 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20750 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88270/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20579 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20579: [SPARK-23372][SQL] Writing empty struct in parquet fails...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20579 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88273/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20829 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20687 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88280/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20687 **[Test build #88280 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88280/testReport)** for PR 20687 at commit [`8adaa47`](https://github.com/apache/spark/commit/8adaa4748b7b0d7990b45e58d82c7d9f2248923f). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20687: [SPARK-23500][SQL] Fix complex type simplification rules...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20687 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20839: [SPARK-23699][PYTHON][SQL] Raise same type of error caug...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20839 **[Test build #88281 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88281/testReport)** for PR 20839 at commit [`5caf63c`](https://github.com/apache/spark/commit/5caf63cc32a7546823e64d774faee9fb63a6b286). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20839: [SPARK-23699][PYTHON][SQL] Raise same type of error caug...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20839 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20839: [SPARK-23699][PYTHON][SQL] Raise same type of error caug...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20839 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20839: [SPARK-23699][PYTHON][SQL] Raise same type of error caug...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20839 **[Test build #88281 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88281/testReport)** for PR 20839 at commit [`5caf63c`](https://github.com/apache/spark/commit/5caf63cc32a7546823e64d774faee9fb63a6b286). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20839: [SPARK-23699][PYTHON][SQL] Raise same type of error caug...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20839 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88281/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20813: [SPARK-23670][SQL] Fix memory leak on SparkPlanGr...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20813#discussion_r174202008 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -53,6 +53,9 @@ class SQLAppStatusStore( def executionsCount(): Long = { store.count(classOf[SQLExecutionUIData]) } + def planGraphCount(): Long = { --- End diff -- nit: add empty line before --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20815: [SPARK-23658][LAUNCHER] InProcessAppHandle uses the wron...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20815 Merging to master / 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r174968594 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true - cache.put(key, consumer) - consumer -} else { - if
[GitHub] spark pull request #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/20777#discussion_r174935305 --- Diff: python/pyspark/ml/feature.py --- @@ -465,26 +473,26 @@ class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, " Default False", typeConverter=TypeConverters.toBoolean) @keyword_only -def __init__(self, minTF=1.0, minDF=1.0, vocabSize=1 << 18, binary=False, inputCol=None, - outputCol=None): +def __init__(self, minTF=1.0, minDF=1.0, maxDF=sys.maxsize, vocabSize=1 << 18, binary=False, --- End diff -- Will make the change now. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174941546 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- Thanks for picking this out! I changed this because I was matching on `$(handleInvalid)` in VectorAssembler and that seems to be the recommended way of doing this. Should I include this in the current PR and add a note or open a separate PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20777 **[Test build #88276 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88276/testReport)** for PR 20777 at commit [`d6cd73a`](https://github.com/apache/spark/commit/d6cd73aacd0683a96a6250d6ab0f9cb9b5eca6f7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88276/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange when cac...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20831 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20839: [SPARK-23699][PYTHON][SQL] Raise same type of err...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/20839 [SPARK-23699][PYTHON][SQL] Raise same type of error caught with Arrow enabled ## What changes were proposed in this pull request? When using Arrow for createDataFrame or toPandas and an error is encountered with fallback disabled, this will raise the same type of error instead of a RuntimeError. ## How was this patch tested? Updated existing tests to verify error type. You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark arrow-raise-same-error-SPARK-23699 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20839 commit 5caf63cc32a7546823e64d774faee9fb63a6b286 Author: Bryan CutlerDate: 2018-03-15T23:07:27Z raise same type of error when not falling back --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174964951 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext) = synchronized { --- End diff -- Some other code in this class uses synchronization on `this`, so I think it would be better to synchronize on `sparkContextPromise` in this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174965130 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue + synchronized { notify() } --- End diff -- Since you have to do this in two places, I'd create a method (e.g. `resumeDriver`) close to where `sparkContextInitialized` is declared, so that it's easier to find the context of why this is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174965182 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue --- End diff -- nit: rest of the code uses American spelling ("initialization"), so this should be consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20829 **[Test build #88277 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88277/testReport)** for PR 20829 at commit [`bf2f5b3`](https://github.com/apache/spark/commit/bf2f5b39b3056b2301afd1e19582fea0e3700f3a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20829 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88277/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org