[GitHub] spark pull request: [SPARK-11476] [DOCS] Incorrect function referr...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9529#issuecomment-154564660 **[Test build #45261 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45261/consoleFull)** for PR 9529 at commit [`ee5c6e2`](https://github.com/apache/spark/commit/ee5c6e280232759b6df78add58c45eab4d0c6712). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44197538 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import scala.reflect.ClassTag + +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.{HashPartitioner, Partitioner} + + +/** + * :: Experimental :: + * Abstract class representing all the specifications of the DStream transformation + * `trackStateByKey` operation of a + * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a + * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). + * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or + * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of + * this class. + * + * Example in Scala: + * {{{ + *val spec = StateSpec(trackingFunction).numPartitions(10) + * + *val emittedRecordDStream = keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec) + * }}} + * + * Example in Java: + * {{{ + *StateStateSpec[StateType, EmittedDataType] spec = + * StateStateSpec.create[StateType, EmittedDataType](trackingFunction).numPartition(10); + * + *JavaDStream[EmittedDataType] emittedRecordDStream = + * javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec); + * }}} + */ +@Experimental +sealed abstract class StateSpec[K, V, S, T] extends Serializable { --- End diff -- note to self: Update this to name KeyType, ValueType, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154570059 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9530#issuecomment-154574543 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9409#discussion_r44198806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -419,3 +419,30 @@ case class Greatest(children: Seq[Expression]) extends Expression { """ } } + +/** Operator that drops a row when it contains any nulls. */ +case class DropAnyNull(child: Expression) extends UnaryExpression { --- End diff -- We might also extend `ExpectsInputType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154574551 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9409#discussion_r44198887 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -419,3 +419,30 @@ case class Greatest(children: Seq[Expression]) extends Expression { """ } } + +/** Operator that drops a row when it contains any nulls. */ +case class DropAnyNull(child: Expression) extends UnaryExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + + protected override def nullSafeEval(input: Any): InternalRow = { +val row = input.asInstanceOf[InternalRow] --- End diff -- Could you also add an test in `ConditionalExpressionSuite`, I think that would have caught this bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/9530#issuecomment-154575961 /cc @rxin @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44201615 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -151,6 +164,30 @@ abstract class Expression extends TreeNode[Expression] { } /** + * Returns the hash for this expression. Expressions that compute the same result, even if + * they differ cosmetically should return the same hash. + */ + def semanticHash() : Int = { +def computeHash(e: Seq[Any]): Int = { --- End diff -- Oh sorry, I get it now... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44201576 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -151,6 +164,30 @@ abstract class Expression extends TreeNode[Expression] { } /** + * Returns the hash for this expression. Expressions that compute the same result, even if + * they differ cosmetically should return the same hash. + */ + def semanticHash() : Int = { +def computeHash(e: Seq[Any]): Int = { --- End diff -- Whats wrong with `hashCode` on `Seq[Any]`? I think this its just going to use [this](https://github.com/scala/scala/blob/27da46343cd545534819300235bc64ab74958c92/src/library/scala/util/hashing/MurmurHash3.scala#L217) which seems pretty reasonable. Maybe I'm missing something though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44202414 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -135,6 +147,7 @@ abstract class Expression extends TreeNode[Expression] { /** * Returns true when two expressions will always compute the same result, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). + * TODO: how should this deal with nonDeterministic --- End diff -- Yeah, I guess we could just add `!nonDeteminstic && ...` since nonDeterministic expressions can't equal anything? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8467][MLlib][PySpark] Add LDAModel.desc...
Github user yu-iskw commented on the pull request: https://github.com/apache/spark/pull/8643#issuecomment-154585403 @jkbradley @davies could you review it? I modified the type conversion using `SerDe.dumps`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-154585601 Build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-11565 Replace deprecated DigestUtils.sha...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9532#issuecomment-154585515 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-11565 Replace deprecated DigestUtils.sha...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9532#issuecomment-154585547 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8514] LU factorization on BlockMatrix
Github user nilmeier commented on the pull request: https://github.com/apache/spark/pull/8563#issuecomment-154591819 I'll look through this, thank you. I usually follow the style guide conventions while running the scripts, but I have missed some of the other conventions.â Cheers, Jerome --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [HOTFIX] Fix python tests after #9527
GitHub user marmbrus opened a pull request: https://github.com/apache/spark/pull/9533 [HOTFIX] Fix python tests after #9527 #9527 missed updating the python tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/marmbrus/spark hotfixTextValue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9533.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9533 commit 0b436e5b251146b5677b765cab1ac1e6a791fe6a Author: Michael ArmbrustDate: 2015-11-07T00:59:05Z [HOTFIX] Fix python tests after #9527 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11433] [SQL] Cleanup the subquery name ...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9385#issuecomment-154594596 Thanks for your contribution, but I'm tempted to not make this change unless there is actually a bug. We are eliminating the subqueries because they will impact optimization and planning. However, keeping the qualifiers around could actually be useful if we want to give better error messages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-11565 Replace deprecated DigestUtils.sha...
Github user gliptak commented on the pull request: https://github.com/apache/spark/pull/9532#issuecomment-154594465 The error message displayed seems to have no relation to the pull request change ... The compile does fail with: [error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala:127: value sha1Hex is not a member of object org.apache.commons.codec.digest.DigestUtils [error] UTF8String.fromString(DigestUtils.sha1Hex(input.asInstanceOf[Array[Byte]])) [error] ^ [error] one error found although ```1.10``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11566][MLlib][Python] Refactoring Gauss...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9534#issuecomment-154597554 **[Test build #45276 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45276/consoleFull)** for PR 9534 at commit [`7334add`](https://github.com/apache/spark/commit/7334add72a43832e27652d5f917c543ac8f4d57f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154597482 **[Test build #45269 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45269/consoleFull)** for PR 9531 at commit [`c5c694f`](https://github.com/apache/spark/commit/c5c694f68e54da6fdeb8ededb8f9976bb3c7bdf4). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `abstract class Writer extends BaseReadWrite `\n * `trait Writable `\n * `abstract class Reader[T] extends BaseReadWrite `\n * `trait Readable[T] `\n * `case class GetInternalRowField(child: Expression, ordinal: Int, dataType: DataType)`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44205755 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -151,6 +164,30 @@ abstract class Expression extends TreeNode[Expression] { } /** + * Returns the hash for this expression. Expressions that compute the same result, even if + * they differ cosmetically should return the same hash. + */ + def semanticHash() : Int = { --- End diff -- When you look at the methods `equals`, `semanticEquals` and `hashCode` of the `AttributeReference`, you will see that they are not matched, as `equals` will take consideration of the `name`, but the other 2 are not, that's why I am thinking we can also use the `hashCode`, instead of adding the new method `semanticHash`. Anyway, it's not an external API, we can change it back anytime, as 1.6 is almost code freeze, and this is critical for people now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154597511 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11476] [DOCS] Incorrect function referr...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9529#issuecomment-154564850 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11217][ML] save/load for non-meta estim...
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/9454#issuecomment-154567630 Spark merge script not happy. Maybe a conflict was just introduced? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10387][ML][WIP] Add code gen for gbt
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9524#issuecomment-154567482 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/9344#discussion_r44196614 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryPool.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import javax.annotation.concurrent.GuardedBy + +/** + * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to + * the [[MemoryManager]]. See subclasses for more details. + * + * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type + * to `Object` to avoid programming errors, since this object should only be used for + * synchronization purposes. + */ +abstract class MemoryPool(lock: Object) { + + @GuardedBy("lcok") --- End diff -- typo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11555] spark on yarn spark-class --num-...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/9523#issuecomment-154575019 LGTM. One day I'd like to see this duplication of command-line arguments vs. SparkConf entries go away... Merging to the 3 branches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44200916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -203,6 +203,10 @@ case class AttributeReference( case _ => false } + override def semanticHash(): Int = { + this.exprId.hashCode() --- End diff -- Nit: I think the indent is off here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44202059 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44202038 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration --- End diff -- -1 on having both `alpha` and
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44202061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -275,8 +275,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro """ } - def createCode(ctx: CodeGenContext, expressions: Seq[Expression]): GeneratedExpressionCode = { -val exprEvals = expressions.map(e => e.gen(ctx)) + def createCode(ctx: CodeGenContext, expressions: Seq[Expression], --- End diff -- Style nit: generally once wrapping we wrap all function args. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9406#discussion_r44203475 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -213,3 +216,178 @@ object Utils { case other => None } } + +/** + * This rule rewrites an aggregate query with multiple distinct clauses into an expanded double + * aggregation in which the regular aggregation expressions and every distinct clause is aggregated + * in a separate group. The results are then combined in a second aggregate. + * + * TODO Expression cannocalization + * TODO Eliminate foldable expressions from distinct clauses. + * TODO This eliminates all distinct expressions. We could safely pass one to the aggregate + * operator. Perhaps this is a good thing? It is much simpler to plan later on... --- End diff -- These are all optimizations. The last one is perhaps a question for @yhuai: we have a choice to rewrite all distinct expressions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44203476 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44203542 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [HOTFIX] Fix python tests after #9527
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/9533 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44205619 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.io.{ObjectInputStream, ObjectOutputStream} + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._ +import org.apache.spark.util.collection.OpenHashMap + +/** Internal interface for defining the map that keeps track of sessions. */ +private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] extends Serializable { + + /** Get the state for a key if it exists */ + def get(key: K): Option[S] + + /** Get all the keys and states whose updated time is older than the given threshold time */ + def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] + + /** Get all the keys and states in this map. */ + def getAll(): Iterator[(K, S, Long)] + + /** Add or update state */ + def put(key: K, state: S, updatedTime: Long): Unit + + /** Remove a key */ + def remove(key: K): Unit + + /** + * Shallow copy `this` map to create a new state map. + * Updates to the new map should not mutate `this` map. + */ + def copy(): StateMap[K, S] + + def toDebugString(): String = toString() +} + +/** Companion object for [[StateMap]], with utility methods */ +private[streaming] object StateMap { + def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new EmptyStateMap[K, S] + + def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = { +val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold", + DELTA_CHAIN_LENGTH_THRESHOLD) +new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold) + } +} + +/** Specific implementation of SessionStore interface representing an empty map */ +private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends StateMap[K, S] { + override def put(key: K, session: S, updateTime: Long): Unit = { +throw new NotImplementedError("put() should not be called on an EmptyStateMap") + } + override def get(key: K): Option[S] = None + override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] = Iterator.empty + override def getAll(): Iterator[(K, S, Long)] = Iterator.empty + override def copy(): StateMap[K, S] = new EmptyStateMap[K, S] + override def remove(key: K): Unit = { } + override def toDebugString(): String = "" +} + + + +/** Implementation of StateMap based on Spark's OpenHashMap */ +private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag]( +@transient @volatile var parentStateMap: StateMap[K, S], +initialCapacity: Int = 64, +deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD + ) extends StateMap[K, S] { self => + + def this(initialCapacity: Int, deltaChainThreshold: Int) = this( +new EmptyStateMap[K, S], +initialCapacity = initialCapacity, +deltaChainThreshold = deltaChainThreshold) + + def this(deltaChainThreshold: Int) = this( +initialCapacity = 64, deltaChainThreshold = deltaChainThreshold) + + def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD) + + @transient @volatile private var deltaMap = +new OpenHashMap[K, StateInfo[S]](initialCapacity) + + /** Get the session data if it exists */ + override def get(key: K): Option[S] = { +val stateInfo = deltaMap(key) +if (stateInfo != null) { + if (!stateInfo.deleted) { +Some(stateInfo.data) + } else { +None + } +} else { + parentStateMap.get(key) +} + } + + /** Get all the keys and states
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44205625 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.io.{ObjectInputStream, ObjectOutputStream} + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._ +import org.apache.spark.util.collection.OpenHashMap + +/** Internal interface for defining the map that keeps track of sessions. */ +private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] extends Serializable { + + /** Get the state for a key if it exists */ + def get(key: K): Option[S] + + /** Get all the keys and states whose updated time is older than the given threshold time */ + def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] + + /** Get all the keys and states in this map. */ + def getAll(): Iterator[(K, S, Long)] + + /** Add or update state */ + def put(key: K, state: S, updatedTime: Long): Unit + + /** Remove a key */ + def remove(key: K): Unit + + /** + * Shallow copy `this` map to create a new state map. + * Updates to the new map should not mutate `this` map. + */ + def copy(): StateMap[K, S] + + def toDebugString(): String = toString() +} + +/** Companion object for [[StateMap]], with utility methods */ +private[streaming] object StateMap { + def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new EmptyStateMap[K, S] + + def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = { +val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold", + DELTA_CHAIN_LENGTH_THRESHOLD) +new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold) + } +} + +/** Specific implementation of SessionStore interface representing an empty map */ +private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends StateMap[K, S] { + override def put(key: K, session: S, updateTime: Long): Unit = { +throw new NotImplementedError("put() should not be called on an EmptyStateMap") + } + override def get(key: K): Option[S] = None + override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] = Iterator.empty + override def getAll(): Iterator[(K, S, Long)] = Iterator.empty + override def copy(): StateMap[K, S] = new EmptyStateMap[K, S] + override def remove(key: K): Unit = { } + override def toDebugString(): String = "" +} + + + +/** Implementation of StateMap based on Spark's OpenHashMap */ +private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag]( +@transient @volatile var parentStateMap: StateMap[K, S], +initialCapacity: Int = 64, +deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD + ) extends StateMap[K, S] { self => + + def this(initialCapacity: Int, deltaChainThreshold: Int) = this( +new EmptyStateMap[K, S], +initialCapacity = initialCapacity, +deltaChainThreshold = deltaChainThreshold) + + def this(deltaChainThreshold: Int) = this( +initialCapacity = 64, deltaChainThreshold = deltaChainThreshold) + + def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD) + + @transient @volatile private var deltaMap = +new OpenHashMap[K, StateInfo[S]](initialCapacity) + + /** Get the session data if it exists */ + override def get(key: K): Option[S] = { +val stateInfo = deltaMap(key) +if (stateInfo != null) { + if (!stateInfo.deleted) { +Some(stateInfo.data) + } else { +None + } +} else { + parentStateMap.get(key) +} + } + + /** Get all the keys and states
[GitHub] spark pull request: [SPARK-10387][ML][WIP] Add code gen for gbt
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9524#issuecomment-154564354 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10387][ML][WIP] Add code gen for gbt
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9524#issuecomment-154564403 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11217][ML] save/load for non-meta estim...
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/9454#issuecomment-154567840 Nevermind, second time's the charm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9406#discussion_r44198936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -213,3 +216,178 @@ object Utils { case other => None } } + +/** + * This rule rewrites an aggregate query with multiple distinct clauses into an expanded double + * aggregation in which the regular aggregation expressions and every distinct clause is aggregated + * in a separate group. The results are then combined in a second aggregate. --- End diff -- It would be really helpful if there was an example of what this rewrite looks like here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9530#issuecomment-154574561 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154574766 This is great. Only minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154574581 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9480#discussion_r44201792 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable + +/** + * This class is used to compute equality of (sub)expression trees. Expressions can be added + * to this class and they subsequently query for expression equality. Expression trees are + * considered equal if for the same input(s), the same result is produced. + */ +class EquivalentExpressions { + /** + * Wrapper around an Expression that provides semantic equality. + */ + case class Expr(e: Expression) { +val hash = e.semanticHash() +override def equals(o: Any): Boolean = o match { + case other: Expr => e.semanticEquals(other.e) + case _ => false +} +override def hashCode: Int = hash + } + + // For each expression, the set of equivalent expressions. + private val equivalenceMap: mutable.HashMap[Expr, mutable.MutableList[Expression]] = + new mutable.HashMap[Expr, mutable.MutableList[Expression]] + + /** + * Adds each expression to this data structure, grouping them with existing equivalent + * expressions. Non-recursive. + * Returns if there was already a matching expression. + */ + def addExpr(expr: Expression): Boolean = { +if (expr.deterministic) { + val e: Expr = Expr(expr) + val f = equivalenceMap.get(e) + if (f.isDefined) { +f.get.+= (expr) +true + } else { +equivalenceMap.put(e, mutable.MutableList(expr)) +false + } +} else { + false +} + } + + /** + * Adds the expression to this datastructure recursively. Stops if a matching expression + * is found. That is, if `expr` has already been added, its children are not added. + * If ignoreLeaf is true, leaf nodes are ignored. + */ + def addExprTree(root: Expression, ignoreLeaf: Boolean = true): Unit = { +val skip = root.isInstanceOf[LeafExpression] && ignoreLeaf +if (!skip && root.deterministic && !addExpr(root)) { + root.children.foreach(addExprTree(_, ignoreLeaf)) +} + } + + /** + * Returns all fo the expression trees that are equivalent to `e`. Returns + * an empty collection if there are none. + */ + def getEquivalentExprs(e: Expression): Seq[Expression] = { +equivalenceMap.get(Expr(e)).getOrElse(mutable.MutableList()) + } + + /** + * Returns all the equivalent sets of expressions. + */ + def getAllEquivalentExprs: Seq[Seq[Expression]] = { +equivalenceMap.map { case(k, v) => { --- End diff -- `equivalenceMap.values`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154583398 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-154583314 Build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44202262 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: SPARK-11565 Replace deprecated DigestUtils.sha...
GitHub user gliptak opened a pull request: https://github.com/apache/spark/pull/9532 SPARK-11565 Replace deprecated DigestUtils.shaHex call You can merge this pull request into a Git repository by running: $ git pull https://github.com/gliptak/spark SPARK-11565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9532.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9532 commit 5e2370ce455267e8bb2584704483e46c85b75b7d Author: Gábor LiptákDate: 2015-11-07T00:22:07Z SPARK-11565 Replace deprecated DigestUtils.shaHex call --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9531#discussion_r44202704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala --- @@ -1478,18 +1478,54 @@ class DataFrame private[sql]( /** * Returns the first `n` rows in the [[DataFrame]]. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * * @group action * @since 1.3.0 */ def take(n: Int): Array[Row] = head(n) /** + * Returns the first `n` rows in the [[DataFrame]] as a list. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * + * @group action + * @since 1.6.0 + */ + def takeAsList(n: Int): java.util.List[Row] = java.util.Arrays.asList(take(n) : _*) + + /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. + * + * Running collect requires moving all the data into the application's driver process, and + * doing so on a very large dataset can crash the driver process with OutOfMemoryError. + * + * For Java API, use [[collectAsList]]. + * * @group action * @since 1.3.0 */ def collect(): Array[Row] = collect(needCallback = true) + /** + * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]]. + * + * Running collect requires moving all the data into the application's driver process, and + * doing so on a very large dataset can crash the driver process with OutOfMemoryError. + * + * @group action + * @since 1.3.0 + */ + def collectAsList(): java.util.List[Row] = withCallback("collectAsList", this) { _ => --- End diff -- oh hmm, not in the PR, but we should add `withCallback` to dataset too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154586827 **[Test build #45263 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45263/consoleFull)** for PR 9480 at commit [`3a10f7d`](https://github.com/apache/spark/commit/3a10f7d0fac41025ced67c42bd60b8203356379c). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `class EquivalentExpressions `\n * ` case class Expr(e: Expression) `\n * ` case class SubExprEliminationState(`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154586930 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44203245 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9409#discussion_r44203342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -419,3 +419,30 @@ case class Greatest(children: Seq[Expression]) extends Expression { """ } } + +/** Operator that drops a row when it contains any nulls. */ +case class DropAnyNull(child: Expression) extends UnaryExpression { --- End diff -- I'll have a look. I was following the reasoning that this is internal API, and that casting (and typing :)) was not a real priority. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9409#discussion_r44203267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -419,3 +419,30 @@ case class Greatest(children: Seq[Expression]) extends Expression { """ } } + +/** Operator that drops a row when it contains any nulls. */ +case class DropAnyNull(child: Expression) extends UnaryExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + + protected override def nullSafeEval(input: Any): InternalRow = { +val row = input.asInstanceOf[InternalRow] --- End diff -- I'll fix this and add a test tomorrow morning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44203695 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on the pull request: https://github.com/apache/spark/pull/9513#issuecomment-154590891 Made a pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11044][SQL] Parquet writer version fixe...
Github user HyukjinKwon commented on the pull request: https://github.com/apache/spark/pull/9060#issuecomment-154597634 @liancheng I assume you missed this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-15459 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-154597740 **[Test build #45272 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45272/consoleFull)** for PR 9488 at commit [`295425e`](https://github.com/apache/spark/commit/295425edeebaeb9b128f027588dbaf087d064d40). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `abstract class Writer extends BaseReadWrite `\n * `trait Writable `\n * `abstract class Reader[T] extends BaseReadWrite `\n * `trait Readable[T] `\n * `case class GetInternalRowField(child: Expression, ordinal: Int, dataType: DataType)`\n * `case class Expand(`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154599114 Alright, going to merge this now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9065][Streaming][PySpark] Add MessageHa...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/7410#issuecomment-154563459 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11217][ML] save/load for non-meta estim...
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/9454#issuecomment-154567294 I'll merge this with branch-1.6 and master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11561][SQL] Rename text data source's c...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/9527 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154567189 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10387][ML][WIP] Add code gen for gbt
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9524#issuecomment-154567471 **[Test build #45264 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45264/consoleFull)** for PR 9524 at commit [`9bc39bf`](https://github.com/apache/spark/commit/9bc39bfaaa67656272df5723596c2fc5c42e79f3). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * ` class $`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44198001 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import scala.reflect.ClassTag + +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.{HashPartitioner, Partitioner} + + +/** + * :: Experimental :: + * Abstract class representing all the specifications of the DStream transformation + * `trackStateByKey` operation of a + * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a + * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). + * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or + * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of + * this class. + * + * Example in Scala: + * {{{ + *val spec = StateSpec(trackingFunction).numPartitions(10) + * + *val emittedRecordDStream = keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec) + * }}} + * + * Example in Java: + * {{{ + *StateStateSpec[StateType, EmittedDataType] spec = + * StateStateSpec.create[StateType, EmittedDataType](trackingFunction).numPartition(10); + * + *JavaDStream[EmittedDataType] emittedRecordDStream = + * javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec); + * }}} + */ +@Experimental +sealed abstract class StateSpec[K, V, S, T] extends Serializable { + + /** Set the RDD containing the initial states that will be used by `trackStateByKey`*/ + def initialState(rdd: RDD[(K, S)]): this.type + + /** Set the RDD containing the initial states that will be used by `trackStateByKey`*/ + def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type + + /** + * Set the number of partitions by which the state RDDs generated by `trackStateByKey` + * will be partitioned. Hash partitioning will be used on the + */ + def numPartitions(numPartitions: Int): this.type + + /** + * Set the partitioner by which the state RDDs generated by `trackStateByKey` will be + * be partitioned. + */ + def partitioner(partitioner: Partitioner): this.type + + /** + * Set the duration after which the state of an idle key will be removed. A key and its state is + * considered idle if it has not received any data for at least the given duration. The state + * tracking function will be called one final time on the idle states that are going to be + * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set + * to `true` in that call. + */ + def timeout(idleDuration: Duration): this.type +} + + +/** + * :: Experimental :: + * Builder object for creating instances of [[org.apache.spark.streaming.StateSpec StateSpec]] + * that is used for specifying the parameters of the DStream transformation + * `trackStateByKey` operation of a + * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a + * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). + * + * Example in Scala: + * {{{ + *val spec = StateSpec(trackingFunction).numPartitions(10) + * + *val emittedRecordDStream = keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec) + * }}} + * + * Example in Java: + * {{{ + *StateStateSpec[StateType, EmittedDataType] spec = + * StateStateSpec.create[StateType, EmittedDataType](trackingFunction).numPartition(10); + * + *JavaDStream[EmittedDataType] emittedRecordDStream = + * javaPairDStream.trackStateByKey[StateType,
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154571410 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154578494 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-11420 Updating Stddev support via Impera...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9380#issuecomment-154579413 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11269][SQL] Java API support & test cas...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9528#issuecomment-154579447 **[Test build #45253 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45253/consoleFull)** for PR 9528 at commit [`f9279e1`](https://github.com/apache/spark/commit/f9279e1f6632e7c9a7f6553f0df0ed741ded54fe). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `case class GetInternalRowField(child: Expression, ordinal: Int, dataType: DataType)`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154579300 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11410] [PYSPARK] Add python bindings fo...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/9504#issuecomment-154579437 Since the last commit only updated the doc, I will merge this into master and 1.6 branch, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154579345 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44200358 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.io.{ObjectInputStream, ObjectOutputStream} + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._ +import org.apache.spark.util.collection.OpenHashMap + +/** Internal interface for defining the map that keeps track of sessions. */ +private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] extends Serializable { + + /** Get the state for a key if it exists */ + def get(key: K): Option[S] + + /** Get all the keys and states whose updated time is older than the given threshold time */ + def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] + + /** Get all the keys and states in this map. */ + def getAll(): Iterator[(K, S, Long)] + + /** Add or update state */ + def put(key: K, state: S, updatedTime: Long): Unit + + /** Remove a key */ + def remove(key: K): Unit + + /** + * Shallow copy `this` map to create a new state map. + * Updates to the new map should not mutate `this` map. + */ + def copy(): StateMap[K, S] + + def toDebugString(): String = toString() +} + +/** Companion object for [[StateMap]], with utility methods */ +private[streaming] object StateMap { + def empty[K: ClassTag, S: ClassTag]: StateMap[K, S] = new EmptyStateMap[K, S] + + def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = { +val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold", + DELTA_CHAIN_LENGTH_THRESHOLD) +new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold) + } +} + +/** Specific implementation of SessionStore interface representing an empty map */ +private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends StateMap[K, S] { + override def put(key: K, session: S, updateTime: Long): Unit = { +throw new NotImplementedError("put() should not be called on an EmptyStateMap") + } + override def get(key: K): Option[S] = None + override def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)] = Iterator.empty + override def getAll(): Iterator[(K, S, Long)] = Iterator.empty + override def copy(): StateMap[K, S] = new EmptyStateMap[K, S] + override def remove(key: K): Unit = { } + override def toDebugString(): String = "" +} + + + +/** Implementation of StateMap based on Spark's OpenHashMap */ +private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag]( +@transient @volatile var parentStateMap: StateMap[K, S], +initialCapacity: Int = 64, +deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD + ) extends StateMap[K, S] { self => + + def this(initialCapacity: Int, deltaChainThreshold: Int) = this( +new EmptyStateMap[K, S], +initialCapacity = initialCapacity, +deltaChainThreshold = deltaChainThreshold) + + def this(deltaChainThreshold: Int) = this( +initialCapacity = 64, deltaChainThreshold = deltaChainThreshold) + + def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD) + + @transient @volatile private var deltaMap = +new OpenHashMap[K, StateInfo[S]](initialCapacity) + + /** Get the session data if it exists */ + override def get(key: K): Option[S] = { +val stateInfo = deltaMap(key) +if (stateInfo != null) { + if (!stateInfo.deleted) { +Some(stateInfo.data) + } else { +None + } +} else { + parentStateMap.get(key) +} + } + + /** Get all the keys and states
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154579232 add to whitelist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11451][SQL] Support single distinct cou...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9409#issuecomment-154579253 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9531#issuecomment-154579148 **[Test build #45269 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45269/consoleFull)** for PR 9531 at commit [`c5c694f`](https://github.com/apache/spark/commit/c5c694f68e54da6fdeb8ededb8f9976bb3c7bdf4). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-154583982 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4557] [Streaming] Spark Streaming forea...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9488#issuecomment-154584002 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9552] Add force control for killExecuto...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7888#discussion_r44201943 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -88,7 +88,8 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on - val activeExecutorIds = new HashSet[String] + // each executor will record running or launched task number + val activeExecutorIdsWithLoads = new HashMap[String, Int] --- End diff -- nit: instead of `WithLoads`, `WithTasks` or `WithTaskCount`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9552] Add force control for killExecuto...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7888#discussion_r44201957 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -88,7 +88,8 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on - val activeExecutorIds = new HashSet[String] + // each executor will record running or launched task number --- End diff -- nit: "task count" instead of "task number". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44202348 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44202796 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9531#discussion_r44202818 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala --- @@ -1478,18 +1478,54 @@ class DataFrame private[sql]( /** * Returns the first `n` rows in the [[DataFrame]]. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * * @group action * @since 1.3.0 */ def take(n: Int): Array[Row] = head(n) /** + * Returns the first `n` rows in the [[DataFrame]] as a list. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * + * @group action + * @since 1.6.0 + */ + def takeAsList(n: Int): java.util.List[Row] = java.util.Arrays.asList(take(n) : _*) + + /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. + * + * Running collect requires moving all the data into the application's driver process, and + * doing so on a very large dataset can crash the driver process with OutOfMemoryError. + * + * For Java API, use [[collectAsList]]. --- End diff -- `which will return the correct type without casting`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11564][SQL] Dataset Java API audit
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/9531#discussion_r44202846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala --- @@ -1478,18 +1478,54 @@ class DataFrame private[sql]( /** * Returns the first `n` rows in the [[DataFrame]]. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * * @group action * @since 1.3.0 */ def take(n: Int): Array[Row] = head(n) /** + * Returns the first `n` rows in the [[DataFrame]] as a list. + * + * Running take requires moving data into the application's driver process, and doing so on a + * very large dataset can crash the driver process with OutOfMemoryError. + * + * @group action + * @since 1.6.0 + */ + def takeAsList(n: Int): java.util.List[Row] = java.util.Arrays.asList(take(n) : _*) + + /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. + * + * Running collect requires moving all the data into the application's driver process, and + * doing so on a very large dataset can crash the driver process with OutOfMemoryError. + * + * For Java API, use [[collectAsList]]. --- End diff -- oh wait, why? The array variance problems don't exist for dataframe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5565] [ML] LDA wrapper for Pipelines AP...
Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/9513#discussion_r44203600 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasSeed, HasMaxIter} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, +EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, +LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, +OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, Matrix, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} +import org.apache.spark.sql.types.StructType + + +private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter + with HasSeed with HasCheckpointInterval { + + /** + * Param for the number of topics (clusters). Must be > 1. Default: 10. + * @group param + */ + @Since("1.6.0") + final val k = new IntParam(this, "k", "number of clusters to create", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("1.6.0") + def getK: Int = $(k) + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If set to a singleton vector [-1], then docConcentration is set automatically. If set to + * singleton vector [alpha] where alpha != -1, then alpha is replicated to a vector of + * length k in fitting. Otherwise, the [[docConcentration]] vector must be length k. + * (default = [-1] = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * @group param + */ + @Since("1.6.0") + final val docConcentration = new DoubleArrayParam(this, "docConcentration", +"Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + + " distributions over topics (\"theta\").", validDocConcentration) + + /** Check that the docConcentration is valid, independently of other Params */ + private def validDocConcentration(alpha: Array[Double]): Boolean = { +if (alpha.length == 1) { + alpha(0) == -1 || alpha(0) >= 1.0 +} else if (alpha.length > 1) { + alpha.forall(_ >= 1.0) +} else { + false +} + } + + /** @group getParam */ + @Since("1.6.0") + def getDocConcentration: Array[Double] = $(docConcentration) + + /** + * Alias for [[getDocConcentration]] + * @group getParam + */ + @Since("1.6.0") + def getAlpha: Array[Double] = getDocConcentration + + /** + * Concentration parameter (commonly named
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9406#discussion_r44203629 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -213,3 +216,178 @@ object Utils { case other => None } } + +/** + * This rule rewrites an aggregate query with multiple distinct clauses into an expanded double + * aggregation in which the regular aggregation expressions and every distinct clause is aggregated + * in a separate group. The results are then combined in a second aggregate. --- End diff -- I'll add an example in the follow-up PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9241][SQL] Supporting multiple DISTINCT...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9406#discussion_r44203639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -213,3 +216,178 @@ object Utils { case other => None } } + +/** + * This rule rewrites an aggregate query with multiple distinct clauses into an expanded double + * aggregation in which the regular aggregation expressions and every distinct clause is aggregated + * in a separate group. The results are then combined in a second aggregate. + * + * TODO Expression cannocalization + * TODO Eliminate foldable expressions from distinct clauses. + * TODO This eliminates all distinct expressions. We could safely pass one to the aggregate + * operator. Perhaps this is a good thing? It is much simpler to plan later on... + */ +object MultipleDistinctRewriter extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case a: Aggregate => rewrite(a) +case p => p + } + + def rewrite(a: Aggregate): Aggregate = { + +// Collect all aggregate expressions. +val aggExpressions = a.aggregateExpressions.flatMap { e => + e.collect { +case ae: AggregateExpression2 => ae + } +} + +// Extract distinct aggregate expressions. +val distinctAggGroups = aggExpressions + .filter(_.isDistinct) + .groupBy(_.aggregateFunction.children.toSet) + +// Only continue to rewrite if there is more than one distinct group. +if (distinctAggGroups.size > 1) { + // Create the attributes for the grouping id and the group by clause. + val gid = new AttributeReference("gid", IntegerType, false)() + val groupByMap = a.groupingExpressions.collect { +case ne: NamedExpression => ne -> ne.toAttribute +case e => e -> new AttributeReference(e.prettyName, e.dataType, e.nullable)() + } + val groupByAttrs = groupByMap.map(_._2) + + // Functions used to modify aggregate functions and their inputs. + def evalWithinGroup(id: Literal, e: Expression) = If(EqualTo(gid, id), e, nullify(e)) + def patchAggregateFunctionChildren( + af: AggregateFunction2, + id: Literal, + attrs: Map[Expression, Expression]): AggregateFunction2 = { +af.withNewChildren(af.children.map { case afc => + evalWithinGroup(id, attrs(afc)) +}).asInstanceOf[AggregateFunction2] + } + + // Setup unique distinct aggregate children. + val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq + val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair).toMap + val distinctAggChildAttrs = distinctAggChildAttrMap.values.toSeq + + // Setup expand & aggregate operators for distinct aggregate expressions. + val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { +case ((group, expressions), i) => + val id = Literal(i + 1) + + // Expand projection + val projection = distinctAggChildren.map { +case e if group.contains(e) => e +case e => nullify(e) + } :+ id + + // Final aggregate + val operators = expressions.map { e => +val af = e.aggregateFunction +val naf = patchAggregateFunctionChildren(af, id, distinctAggChildAttrMap) +(e, e.copy(aggregateFunction = naf, isDistinct = false)) + } + + (projection, operators) + } + + // Setup expand for the 'regular' aggregate expressions. + val regularAggExprs = aggExpressions.filter(!_.isDistinct) + val regularAggChildren = regularAggExprs.flatMap(_.aggregateFunction.children).distinct + val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair).toMap + + // Setup aggregates for 'regular' aggregate expressions. + val regularGroupId = Literal(0) + val regularAggOperatorMap = regularAggExprs.map { e => --- End diff -- I'll add documentation in a follow-up PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154590257 **[Test build #1999 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1999/consoleFull)** for PR 9480 at commit [`3a10f7d`](https://github.com/apache/spark/commit/3a10f7d0fac41025ced67c42bd60b8203356379c). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `class EquivalentExpressions `\n * ` case class Expr(e: Expression) `\n * ` case class SubExprEliminationState(`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-11420 Updating Stddev support via Impera...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/9380#issuecomment-154593369 Even though its simple, I think this implementation is boxing the result, which could result in slower performance on real workloads (but is harder to see in micro benchmarks) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11555] spark on yarn spark-class --num-...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/9523#issuecomment-154598834 @tgravescs , sorry for missing this parameter `--num-workers`. I suppose user will not directly invoke yarn client through `spark-class`. Also from `SparkSubmitArguments`, this `--num-workers` is not parsed. So maybe we should deprecate this parameter, as @vanzin said current there're so many ways we could set yarn related configurations, it is hard to manage and easy to introduce bug, also confused a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11515][ML] QuantileDiscretizer should t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9535#issuecomment-154602302 **[Test build #45277 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45277/consoleFull)** for PR 9535 at commit [`bfabeb2`](https://github.com/apache/spark/commit/bfabeb2c88227c90d68a7699106accc01f1bf2f9). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154568105 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154568133 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11389][CORE] Add support for off-heap m...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/9344#issuecomment-154568018 LGTM, pending on tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11217][ML] save/load for non-meta estim...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/9454 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9256#discussion_r44197569 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import scala.reflect.ClassTag + +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.{HashPartitioner, Partitioner} + + +/** + * :: Experimental :: + * Abstract class representing all the specifications of the DStream transformation + * `trackStateByKey` operation of a + * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a + * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). + * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or + * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of + * this class. + * + * Example in Scala: + * {{{ + *val spec = StateSpec(trackingFunction).numPartitions(10) + * + *val emittedRecordDStream = keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec) + * }}} + * + * Example in Java: + * {{{ + *StateStateSpec[StateType, EmittedDataType] spec = + * StateStateSpec.create[StateType, EmittedDataType](trackingFunction).numPartition(10); + * + *JavaDStream[EmittedDataType] emittedRecordDStream = + * javaPairDStream.trackStateByKey[StateType, EmittedDataType](spec); + * }}} + */ +@Experimental +sealed abstract class StateSpec[K, V, S, T] extends Serializable { + + /** Set the RDD containing the initial states that will be used by `trackStateByKey`*/ + def initialState(rdd: RDD[(K, S)]): this.type + + /** Set the RDD containing the initial states that will be used by `trackStateByKey`*/ + def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type + + /** + * Set the number of partitions by which the state RDDs generated by `trackStateByKey` + * will be partitioned. Hash partitioning will be used on the + */ + def numPartitions(numPartitions: Int): this.type + + /** + * Set the partitioner by which the state RDDs generated by `trackStateByKey` will be + * be partitioned. + */ + def partitioner(partitioner: Partitioner): this.type + + /** + * Set the duration after which the state of an idle key will be removed. A key and its state is + * considered idle if it has not received any data for at least the given duration. The state + * tracking function will be called one final time on the idle states that are going to be + * removed; [[org.apache.spark.streaming.State State.isTimingOut()]] set + * to `true` in that call. + */ + def timeout(idleDuration: Duration): this.type +} + + +/** + * :: Experimental :: + * Builder object for creating instances of [[org.apache.spark.streaming.StateSpec StateSpec]] + * that is used for specifying the parameters of the DStream transformation + * `trackStateByKey` operation of a + * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a + * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). + * + * Example in Scala: + * {{{ + *val spec = StateSpec(trackingFunction).numPartitions(10) + * + *val emittedRecordDStream = keyValueDStream.trackStateByKey[StateType, EmittedDataType](spec) + * }}} + * + * Example in Java: + * {{{ + *StateStateSpec[StateType, EmittedDataType] spec = + * StateStateSpec.create[StateType, EmittedDataType](trackingFunction).numPartition(10); + * + *JavaDStream[EmittedDataType] emittedRecordDStream = + * javaPairDStream.trackStateByKey[StateType,
[GitHub] spark pull request: [SPARK-10371] [SQL] Implement subexpr eliminat...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9480#issuecomment-154570265 **[Test build #1999 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1999/consoleFull)** for PR 9480 at commit [`3a10f7d`](https://github.com/apache/spark/commit/3a10f7d0fac41025ced67c42bd60b8203356379c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org