[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/1297#discussion_r17790219 --- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.immutable.LongMap +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel + +import IndexedRDD.Id + +/** + * Contains members that are shared among all variants of IndexedRDD (e.g., IndexedRDD, + * VertexRDD). + * + * @tparam V the type of the values stored in the IndexedRDD + * @tparam P the type of the partitions making up the IndexedRDD + * @tparam Self the type of the implementing container. This allows transformation methods on any + * implementing container to yield a result of the same type. + */ +private[spark] trait IndexedRDDLike[ +@specialized(Long, Int, Double) V, +P[X] : IndexedRDDPartitionLike[X, P], +Self[X] : IndexedRDDLike[X, P, Self]] + extends RDD[(Id, V)] { + + /** A generator for ClassTags of the value type V. */ + protected implicit def vTag: ClassTag[V] + + /** A generator for ClassTags of the partition type P. */ + protected implicit def pTag[V2]: ClassTag[P[V2]] + + /** Accessor for the IndexedRDD variant that is mixing in this trait. */ + protected def self: Self[V] + + /** The underlying representation of the IndexedRDD as an RDD of partitions. */ + def partitionsRDD: RDD[P[V]] + require(partitionsRDD.partitioner.isDefined) + + def withPartitionsRDD[V2: ClassTag](partitionsRDD: RDD[P[V2]]): Self[V2] + + override val partitioner = partitionsRDD.partitioner + + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + override protected def getPreferredLocations(s: Partition): Seq[String] = +partitionsRDD.preferredLocations(s) + + override def persist(newLevel: StorageLevel): this.type = { +partitionsRDD.persist(newLevel) +this + } + + override def unpersist(blocking: Boolean = true): this.type = { +partitionsRDD.unpersist(blocking) +this + } + + override def count(): Long = { +partitionsRDD.map(_.size).reduce(_ + _) + } + + /** Provides the `RDD[(Id, V)]` equivalent output. */ + override def compute(part: Partition, context: TaskContext): Iterator[(Id, V)] = { +firstParent[P[V]].iterator(part, context).next.iterator + } + + /** Gets the value corresponding to the specified key, if any. */ + def get(k: Id): Option[V] = multiget(Array(k)).get(k) + + /** Gets the values corresponding to the specified keys, if any. */ + def multiget(ks: Array[Id]): Map[Id, V] = { +val ksByPartition = ks.groupBy(k = self.partitioner.get.getPartition(k)) +val partitions = ksByPartition.keys.toSeq +def unionMaps(maps: TraversableOnce[LongMap[V]]): LongMap[V] = { + maps.foldLeft(LongMap.empty[V]) { +(accum, map) = accum.unionWith(map, (id, a, b) = a) + } +} +// TODO: avoid sending all keys to all partitions by creating and zipping an RDD of keys --- End diff -- would this be another use of the `bulkMultiget` I suggested in jira? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/1297#discussion_r17791303 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ImmutableLongOpenHashSet.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.reflect._ +import com.google.common.hash.Hashing + +/** + * A fast, immutable hash set optimized for insertions and lookups (but not deletions) of `Long` + * elements. Because it exposes the position of a key in the underlying array, this is useful as a + * building block for higher level data structures such as a hash map (for example, + * IndexedRDDPartition). + * + * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed to explore all + * spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing). + */ +private[spark] class ImmutableLongOpenHashSet( +/** Underlying array of elements used as a hash table. */ +val data: ImmutableVector[Long], +/** Whether or not there is an element at the corresponding position in `data`. */ +val bitset: ImmutableBitSet, +/** + * Position of a focused element. This is useful when returning a modified set along with a + * pointer to the location of modification. + */ +val focus: Int, +/** Load threshold at which to grow the underlying vectors. */ +loadFactor: Double + ) extends Serializable { + + require(loadFactor 1.0, Load factor must be less than 1.0) + require(loadFactor 0.0, Load factor must be greater than 0.0) + require(capacity == nextPowerOf2(capacity), data capacity must be a power of 2) + + import OpenHashSet.{INVALID_POS, NONEXISTENCE_MASK, POSITION_MASK, Hasher, LongHasher} + + private val hasher: Hasher[Long] = new LongHasher + + private def mask = capacity - 1 + private def growThreshold = (loadFactor * capacity).toInt + + def withFocus(focus: Int): ImmutableLongOpenHashSet = +new ImmutableLongOpenHashSet(data, bitset, focus, loadFactor) + + /** The number of elements in the set. */ + def size: Int = bitset.cardinality + + /** The capacity of the set (i.e. size of the underlying vector). */ + def capacity: Int = data.size + + /** Return true if this set contains the specified element. */ + def contains(k: Long): Boolean = getPos(k) != INVALID_POS + + /** + * Nondestructively add an element to the set, returning a new set. If the set is over capacity + * after the insertion, grows the set and rehashes all elements. + */ + def add(k: Long): ImmutableLongOpenHashSet = { +addWithoutResize(k).rehashIfNeeded(ImmutableLongOpenHashSet.grow, ImmutableLongOpenHashSet.move) + } + + /** + * Add an element to the set. This one differs from add in that it doesn't trigger rehashing. + * The caller is responsible for calling rehashIfNeeded. + * + * Use (retval.focus POSITION_MASK) to get the actual position, and + * (retval.focus NONEXISTENCE_MASK) == 0 for prior existence. + */ + def addWithoutResize(k: Long): ImmutableLongOpenHashSet = { +var pos = hashcode(hasher.hash(k)) mask +var i = 1 +var result: ImmutableLongOpenHashSet = null +while (result == null) { + if (!bitset.get(pos)) { +// This is a new key. +result = new ImmutableLongOpenHashSet( + data.updated(pos, k), bitset.set(pos), pos | NONEXISTENCE_MASK, loadFactor) + } else if (data(pos) == k) { +// Found an existing key. +result = this.withFocus(pos) + } else { +val delta = i +pos = (pos + delta) mask +i += 1 + } +} +result + } + + /** + * Rehash the set if it is overloaded. + * @param allocateFunc Callback invoked when we
[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/1297#issuecomment-56199798 This looks great! my comments are minor. I know its early to be discussing example docs, but I just wanted to mention that I can see caching being an area of confusion. Eg., you wouldn't want to serialize cache each update to an indexedRDD, as each cache would make a full copy and not get the benefits of the ImmutableVectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4276 fix for two working thread
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3141#issuecomment-62320554 I agress w/ TD, I don't think this change is necessary. I think we should close this and, @svar29 , maybe you can discuss the problem you are running into on the spark-user mailing list, hopefully we can help you out there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4260] Httpbroadcast should set connecti...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3122#issuecomment-62320949 This looks good, but could also explain what necessitates this change? Did you observe some error? If nothing else, just putting the error you observed in the JIRA would help somebody else find this patch if they run into the error as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3100#discussion_r20062181 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/TripletFields.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +/** + * Represents a subset of the fields of an [[EdgeTriplet]] or [[EdgeContext]]. This allows the + * system to populate only those fields for efficiency. + */ +class TripletFields private ( +val useSrc: Boolean, +val useDst: Boolean, +val useEdge: Boolean) --- End diff -- maybe I'm just missing it, but it seems like `useEdge` is never used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1344 [DOCS] Scala API docs for top metho...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3168#issuecomment-62323279 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-971 [DOCS] Link to Confluence wiki from ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3169#issuecomment-62323284 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1021] Defer the data-driven computation...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3079#discussion_r20062337 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -113,8 +117,12 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions - private var rangeBounds: Array[K] = { -if (partitions = 1) { + @volatile private var valRB: Array[K] = null --- End diff -- `valRD` is a kinda confusing name. I think the convention would be to name it `_rangeBounds`. Eg. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala#L111 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r20063810 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector = BreezeVector, DenseMatrix = BreezeMatrix} +import breeze.linalg.{Transpose, det, inv} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} +import org.apache.spark.SparkContext.DoubleAccumulatorParam + +/** + * Expectation-Maximization for multivariate Gaussian Mixture Models. + * + */ +object GMMExpectationMaximization { + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param maxIterations the maximum number of iterations to perform + * @param delta change in log-likelihood at which convergence is considered achieved + */ + def train(data: RDD[Vector], k: Int, maxIterations: Int, delta: Double): GaussianMixtureModel = { +new GMMExpectationMaximization().setK(k) + .setMaxIterations(maxIterations) + .setDelta(delta) + .run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param maxIterations the maximum number of iterations to perform + */ + def train(data: RDD[Vector], k: Int, maxIterations: Int): GaussianMixtureModel = { +new GMMExpectationMaximization().setK(k).setMaxIterations(maxIterations).run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + */ + def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { +new GMMExpectationMaximization().setK(k).run(data) + } +} + +/** + * This class performs multivariate Gaussian expectation maximization. It will + * maximize the log-likelihood for a mixture of k Gaussians, iterating until + * the log-likelihood changes by less than delta, or until it has reached + * the max number of iterations. + */ +class GMMExpectationMaximization private ( +private var k: Int, +private var delta: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + + // A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold + def this() = this(2, 0.01, 100) + + /** Set the number of Gaussians in the mixture model. Default: 2 */ + def setK(k: Int): this.type = { +this.k = k +this + } + + /** Set the maximum number of iterations to run. Default: 100 */ + def setMaxIterations(maxIterations: Int): this.type = { +this.maxIterations = maxIterations +this + } + + /** + * Set the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def setDelta(delta: Double): this.type = { +this.delta = delta +this + } + + /** Machine precision value used to ensure matrix conditioning */ + private val eps = math.pow(2.0, -52) + + /** Perform expectation maximization */ + def run(data: RDD[Vector]): GaussianMixtureModel = { +val ctx = data.sparkContext + +// we will operate on the data as breeze data +val breezeData = data.map{ u
[GitHub] spark pull request: [SPARK-4017] show progress bar in console and ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3029#issuecomment-62330615 this is awesome! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4087] use broadcast for task only when ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/2933#issuecomment-62330965 I agree with @pwendell . It seems like the right thing to do is just fix Broadcast ... and if we can't, then wouldn't you also want to turn off Broadcast even for big closures? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4017] show progress bar in console and ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3029#issuecomment-62475673 I was just about to suggest the same thing . So I admit it seemed a lot cooler to have the console keep updating, but I agree with their concerns. As a slight modification of @kayousterhout 's proposal, what if instead of logging for *every* update, you log whenever some time unit have elapsed (eg., 1 second) *and* some unit of work has been done (that is, both conditions must be true, not either for either condition)? That way the logs dont' get clobbered with lots of little updates -- if you have 1000 tasks but the whole thing finishes in under 1 second, you really don't to monitor the progress in the logs. But by just using the normal logging mechanism, its still controllable via normal logging mechanism plays nicely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3936] Add aggregateMessages, which supe...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3100#discussion_r20130288 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala --- @@ -285,50 +337,126 @@ class EdgePartition[ } /** - * Upgrade the given edge iterator into a triplet iterator. + * Send messages along edges and aggregate them at the receiving vertices. Implemented by scanning + * all edges sequentially and filtering them with `idPred`. + * + * @param sendMsg generates messages to neighboring vertices of an edge + * @param mergeMsg the combiner applied to messages destined to the same vertex + * @param sendMsgUsesSrcAttr whether or not `mapFunc` uses the edge's source vertex attribute + * @param sendMsgUsesDstAttr whether or not `mapFunc` uses the edge's destination vertex attribute + * @param idPred a predicate to filter edges based on their source and destination vertex ids * - * Be careful not to keep references to the objects from this iterator. To improve GC performance - * the same object is re-used in `next()`. + * @return iterator aggregated messages keyed by the receiving vertex id */ - def upgradeIterator( - edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true) -: Iterator[EdgeTriplet[VD, ED]] = { -new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst) + def aggregateMessages[A: ClassTag]( + sendMsg: EdgeContext[VD, ED, A] = Unit, + mergeMsg: (A, A) = A, + tripletFields: TripletFields, + idPred: (VertexId, VertexId) = Boolean): Iterator[(VertexId, A)] = { +val aggregates = new Array[A](vertexAttrs.length) +val bitset = new BitSet(vertexAttrs.length) + +var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) +var i = 0 +while (i size) { + val localSrcId = localSrcIds(i) + val srcId = local2global(localSrcId) + val localDstId = localDstIds(i) + val dstId = local2global(localDstId) + if (idPred(srcId, dstId)) { +ctx.localSrcId = localSrcId +ctx.localDstId = localDstId +ctx.srcId = srcId +ctx.dstId = dstId +ctx.attr = data(i) +if (tripletFields.useSrc) { ctx.srcAttr = vertexAttrs(localSrcId) } +if (tripletFields.useDst) { ctx.dstAttr = vertexAttrs(localDstId) } +sendMsg(ctx) + } + i += 1 +} + +bitset.iterator.map { localId = (local2global(localId), aggregates(localId)) } } /** - * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The - * iterator is generated using an index scan, so it is efficient at skipping edges that don't - * match srcIdPred. + * Send messages along edges and aggregate them at the receiving vertices. Implemented by + * filtering the source vertex index with `srcIdPred`, then scanning edge clusters and filtering + * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an edge to run. * - * Be careful not to keep references to the objects from this iterator. To improve GC performance - * the same object is re-used in `next()`. - */ - def indexIterator(srcIdPred: VertexId = Boolean): Iterator[Edge[ED]] = -index.iterator.filter(kv = srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) - - /** - * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The - * cluster must start at position `index`. + * @param sendMsg generates messages to neighboring vertices of an edge + * @param mergeMsg the combiner applied to messages destined to the same vertex + * @param srcIdPred a predicate to filter edges based on their source vertex id + * @param dstIdPred a predicate to filter edges based on their destination vertex id * - * Be careful not to keep references to the objects from this iterator. To improve GC performance - * the same object is re-used in `next()`. + * @return iterator aggregated messages keyed by the receiving vertex id */ - private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { -private[this] val edge = new Edge[ED] -private[this] var pos = index + def aggregateMessagesWithIndex[A: ClassTag]( + sendMsg: EdgeContext[VD, ED, A] = Unit, + mergeMsg: (A, A) = A, + tripletFields: TripletFields, + srcIdPred: VertexId = Boolean, + dstIdPred: VertexId = Boolean): Iterator[(VertexId, A)] = { +val aggregates = new Array[A](vertexAttrs.length) +val bitset = new
[GitHub] spark pull request: [SPARK-4017] show progress bar in console and ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3029#issuecomment-62499404 I totally see the appeal of the one-progress bar (hence my initial excitement when I tried this out). But if it doesn't play nicely with logging multiple stages, this seems like a very small improvement for the initial user experience, but a big headache for serious users. I don't think its really that much worse if your example changes to ``` scala rdd.count() [INFO] Stage 1 [= ] [INFO] Stage 1 [==] [INFO] Stage 1 finished in 2 seconds (med/avg/) res0: Int = ``` In this case its just a couple more lines. If the stage took longer, than it would be even more lines, but that seems ok, since its not that much noise per unit time. If the code were moved to a separate SparkListener implementation, than it could have its own log level, and even be INFO by default (so we leave everything else as WARN). INFO for everything in spark is way too noisy for the average spark user, but that doesn't mean we can't use INFO for a few select classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4017] show progress bar in console and ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3029#issuecomment-63149772 Sorry for my delay in responding ... (a) I think this DOES add a lot of value over the std INFO logging. One log line per task completion is *much* noisier than what I'm proposing here, for a job with hundreds of tasks that completes in a few seconds (at least for me, a very common case). (b) I think changing the logging configuration to be INFO for this, and leaving at WARN for everything else, is pretty easy. The other comments above already request this be moved into a `SparkListener`, so you would just add a line: ``` log4j.logger.org.apache.spark.reporter.JobProgressConsoleReporter=INFO ``` (though I realize now that I actually am not sure where the logging setup for the examples is configured ...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5199. Input metrics should show up for I...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4050#discussion_r23555820 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -218,13 +219,14 @@ class HadoopRDD[K, V]( // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = inputMetrics.bytesReadCallback.orElse( -split.inputSplit.value match { - case split: FileSplit = - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf) - case _ = None + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { +val inputSplit = split.inputSplit.value +if (inputSplit.isInstanceOf[FileSplit] || inputSplit.isInstanceOf[CombineFileSplit]) { --- End diff -- this is fine as is, but fyi you can do the same thing in a pattern match: ``` split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit = SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(jobConf) case _ = None } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4337. [YARN] Add ability to cancel pendi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4141#discussion_r23557845 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -192,15 +186,32 @@ private[yarn] class YarnAllocator( } /** - * Request numExecutors additional containers from YARN. Visible for testing. + * Update the set of container requests that we will sync with the RM based on the number of + * executors we have currently running and our target number of executors. + * + * Visible for testing. */ - def addResourceRequests(numExecutors: Int): Unit = { -for (i - 0 until numExecutors) { - val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) - amClient.addContainerRequest(request) - val nodes = request.getNodes - val hostStr = if (nodes == null || nodes.isEmpty) Any else nodes.last - logInfo(Container request (host: %s, capability: %s.format(hostStr, resource)) + def updateResourceRequests(): Unit = { --- End diff -- how about making it `private[yarn]`? will still be visible in tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4337. [YARN] Add ability to cancel pendi...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4141#issuecomment-71523335 just a minor comment, otherwise lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-71512674 I figured out the magic combination to make sbt, scalatest, junit, and the sbt-pom-reader all play nicely together. I had to introduce a new config (or scope or something, sbt terminology still baffles me ...) instead of creating a new task. Now you can run `unit:test` (or any other variant you like, eg `~unit:testQuick`) which will exclude everything tagged as an IntegrationTest. There is both a tag for scalatests, and a category for junit tests. I've still only bothered with the tagging in core. But I think this can be merged as is in any case -- this does the setup so someone more familiar w/ the other projects can figure out what to tag. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3454] Expose JSON representation of dat...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/2333#issuecomment-71532457 Hi @sarutak thanks for your work on this. Josh's other PR https://github.com/apache/spark/pull/2696 has been merged for a while now. I'm gonna take another crack at implementing this on top of the lastest changes and getting this integrated into the history server too. Can you please close this PR (unless you are planning on updating it in the very near future)? Hopefully that will let your other PR https://github.com/apache/spark/pull/2342 to get merged in -- looks really cool! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4187#issuecomment-71534196 thanks for all the extra detail @MickDavies. lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2450 Adds executor log links to Web UI
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3486#discussion_r23938653 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -382,7 +382,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { (Host - executorInfo.executorHost) ~ -(Total Cores - executorInfo.totalCores) +(Total Cores - executorInfo.totalCores) ~ +(Log Urls - mapToJson(executorInfo.logUrlMap)) --- End diff -- should this use `Utils.jsonOption` per the comment at the top of `JsonProtocol` when adding new fields? Also I guess there should be a backward compatibility test case added to `JsonProtocolSuite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2450 Adds executor log links to Web UI
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3486#discussion_r23938252 --- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala --- @@ -26,7 +26,7 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ -private case class ExecutorSummaryInfo( +private[ui] case class ExecutorSummaryInfo( --- End diff -- I don't think this visibility change is necessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3454] [WIP] separate json endpoints for...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/4435 [SPARK-3454] [WIP] separate json endpoints for data in the UI Exposes data available in the UI as json over http. There are some TODOs and the code needs cleanup, but there is enough here to get feedback on the overall direction I think. Key points: * new set of public case classes for what is represented in the json. Lots of boilerplate code to copy from other internal representations into these ones, but I figure its best to keep the public versions complete separate. * Though the code to serve the json no longer lives in the XyzPage classes, the json is still tied closely to the UI. Its basically pulling data from the various `UIData` classes and massaging it a little. * json is all constructed by jackson, no manual creation of the json ASTs via json4s. * there are a bunch of new files in core/src/test/resources/HistoryServerExpectations with the produced json -- both as examples of what is generated and tests to ensure it doesn't change going forward. * instead of making a new servlethandler per endpoint, as in the page classes, its all just one servlet handler which has some simple logic to dispatch the requests, see `JsonRequestHandler` If this direction seems alright it should be straightforward to fill in the missing bits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-3454 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4435.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4435 commit 4b398d0588e25b9861de09e0b31ac99aaca3346e Author: Imran Rashid iras...@cloudera.com Date: 2015-02-06T20:53:21Z expose UI data as json in new endpoints commit d1a8c9253f53541bfc95d6d2df16ad8e176c131d Author: Imran Rashid iras...@cloudera.com Date: 2015-02-06T20:53:50Z add sbt-revolved plugin, to make it easier to start stop http servers in sbt commit b252e7a3752e70de1e986faba159d0a0087480cd Author: Imran Rashid iras...@cloudera.com Date: 2015-02-06T21:30:21Z small cleanup of accidental changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3454] [WIP] separate json endpoints for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4435#discussion_r24285254 --- Diff: core/pom.xml --- @@ -215,6 +215,11 @@ version3.2.10/version /dependency dependency + groupIdcom.fasterxml.jackson.module/groupId + artifactIdjackson-module-scala_2.10/artifactId + version2.3.1/version --- End diff -- not exactly what you're asking but this module doesn't always play nicely with different versions of jackson itself. So if somehow jackson gets bumped up to a later version b/c of some transitive dependency, often this needs to get bumped up to go along with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: assumePartitioned
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/4449 assumePartitioned https://issues.apache.org/jira/browse/SPARK-1061 If you partition an RDD, save to hdfs, then reload it in a separate SparkContext, you've lost the info that the RDD was partitioned. This prevents you from getting the savings of a narrow dependency you could get. This is especially painful if you've got some big dataset on hdfs, and you periodically get small updates that need to be joined against it. `assumePartitionedBy` lets you simply assign a partitioner to an RDD, so you can get your narrow dependencies back. Its up to the application to know what the partitioner should be, but it will at least verify the assignment is OK. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-1061 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4449.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4449 commit e041155fc332da933f2ac22b311682331e1bc64a Author: Imran Rashid iras...@cloudera.com Date: 2015-02-07T05:54:35Z assumePartitioned --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3454] [WIP] separate json endpoints for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4435#discussion_r24278046 --- Diff: .rat-excludes --- @@ -65,3 +65,6 @@ logs .*scalastyle-output.xml .*dependency-reduced-pom.xml known_translations +json_expectation +local-1422981759269/* +local-1422981780767/* --- End diff -- is this OK? or should the apache header go in the test resource files as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5574] use given name prefix in dir
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4344#issuecomment-72752833 whoops, sorry I forgot about the title, just updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: use given name prefix in dir
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/4344 use given name prefix in dir https://issues.apache.org/jira/browse/SPARK-5574 very minor, doesn't effect external behavior at all. Note that after this change, some of these dirs no longer will have spark in the name at all. I could change those locations that do pass in a name prefix to also include spark, eg. blockmgr - spark-blockmgr You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-5574 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4344 commit 33a84fe22bb53577450c918b2db4ae7150cc4ab8 Author: Imran Rashid iras...@cloudera.com Date: 2015-02-03T22:31:52Z use given name prefix in dir --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4874] [CORE] Collect record count metri...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4067#discussion_r23942254 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -203,8 +206,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage(stage) { None } else { + def getDistributionQuantities(data: Seq[Double]): IndexedSeq[Double] = --- End diff -- typo: `getDistributionQuantiles`, (with an `l`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2450 Adds executor log links to Web UI
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3486#issuecomment-72495719 other than adding a test case, I think the code looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4874] [CORE] Collect record count metri...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4067#discussion_r23948987 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -472,12 +512,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage(stage) { }} {if (hasInput) { td sorttable_customkey={inputSortable} -{inputReadable} +{s$inputReadable / $inputRecords} --- End diff -- from the way the json serialization is done, I think you'll get something weird here when loading data from old runs that didn't have this field. Each task is assigned a `-1`, so you end up with some negative number at the stage summary. eg., I just ran a little test and I see Input Size / Records as 160.0 B / -8. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1061] assumePartitioned
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4449#issuecomment-73449430 @pwendell its a good question, I was wondering the same thing a little bit as I was writing those unit tests and was going to comment on the jira about this a little. It is definitely annoying to have to write a custom input format -- but I only need to do that to turn off splits. Every once in a while this comes up on the user list too -- should we just add another version of `sc.hadoopFile`, `sc.textFile`, and `sc.sequenceFile` to turn off splits? Unfortunately I don't think it makes any sense to directly pass an `assumedPartitioner` as an argument to those functions, since you really need to put in a map step in the middle to extract the key. Really this gets to a more general question: when do we add these convenience methods to RDD?? Given that this requires application logic to track the partitioner to use, I doubt this will ever be used by other code within spark itself. But I would still make the case for its inclusion, since (a) it leads to a big optimization that is not obvious to most users. By promoting it to a function within spark itself, users are more likely to be aware of it. (b) its a little tricky to get right -- I think the `verify` step is really important to make sure this doesn't lead to completely wrong results in the user app. And (c) I think its a common use case. Not so common it would make it into spark tutorial's, or even into daily use of an experienced spark-user -- but I imagine it has a place in every batch use of spark, where there is some big dataset that lives on hdfs between spark contexts. OTOH, we could just put this in some general location with spark-examples, and leave it out of spark itself. I guess we only need to make the change to `HadoopRDD` to sort the partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-69990018 oh good point Marcelo -- I forgot to add that I've only done this for `core` in this PR. I wanted to ask others whether its worthwhile to do in other projects or not before I go digging into each one of them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4747 make it easy to skip IntegrationTes...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/4048 SPARK-4747 make it easy to skip IntegrationTests * create an `IntegrationTest` tag * label all tests in core as an `IntegrationTest` if they use a `local-cluster` * make a `unit-test` task in sbt so its easy to skip all the unit tests in local development. On my laptop, this means that I can run `~unit-test` in my sbt console, which takes ~5 mins on the first run. But since it is calling `test-quick` under the hood, this means that as I make changes, it only re-runs the tests I've effected. so generally I can update on all tests in a second or two. Of course this means its skipping a bunch of important tests, but hopefully this is a useful subset of tests that can actually be run locally. If you don't skip the IntegrationTests, its totally impractical to ever get through even the first run of `test-quick` on my laptop. An added bonus is that this set of tests can be run without having to ever do the `mvn package` step, since we are never launching a full cluster as an external process. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-4746 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4048.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4048 commit 030cc0c0cb57d35a043e68509a3997fb3f1a3dc1 Author: Imran Rashid iras...@cloudera.com Date: 2015-01-13T21:45:38Z add IntegrationTest tag, and label a bunch of tests in core commit 30f4d636387e57e9c104024db5a20afcde1b7cbb Author: Imran Rashid iras...@cloudera.com Date: 2015-01-14T19:36:37Z add a unit-test task commit 3a8503227d53554155e5766ce12d48039854f163 Author: Imran Rashid iras...@cloudera.com Date: 2015-01-14T20:41:07Z fix task name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fix tiny typo in BlockManager
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4046#issuecomment-70013728 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5217 Spark UI should report pending stag...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4043#issuecomment-70015231 lgtm. I was going to suggest that pending stages should be sorted with oldest submission time first, not reversed ... but I guess we want the completed stages sorted with oldest last, and probably makes sense to keep those tables consistent with each other. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-70008794 @pwendell To be honest I'd never had the patience to run all the tests before on my laptop. But I just tried them both again: 237 seconds vs. 852 seconds (just for core), so these tests are down to ~25% of the total time. It makes using `test-quick` a reality, while I have no end of complaints about sbt, I really like `test-quick`. @vanzin yeah there are also a bunch of tests in other projects, I know there are a bunch in mllib as well. (I guess this is really a WIP at this point, wanted to see if I folks like the idea before I do it in other projects.) @srowen I wasn't familiar with `surefire` or `failsafe` from maven, but I figured there was some other way to just support an integration test separation. I am just more familiar with tagging tests, and I think it buys us some more flexibility. I'll see if I can figure out how to setup a maven task (or whatever its called) to only run the unit tests, but I know very little about maven ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5236] Fix ClassCastException in Specifi...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4039#issuecomment-70014846 can you add a unit test for what this fixes? I don't see how this avoids the exceptions, just seems to push them down into `MutableValue.update`. A test case would help convince me (admittedly I'm really unfamiliar with this code). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-70017200 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-70181484 so, this doesn't actually work quite the way I wanted it to. It turns out its skipping all the Junit tests as well. The junit tests are run if you run with `test-only * -- -l`, but as sound as you add a tag like `test-only * -- -l foo`, then all the junit tests are skipped. From the [junit-interface docs](https://github.com/sbt/junit-interface) Any parameter not starting with - or + is treated as a glob pattern for matching tests. I will look into a solution for this, but I have a feeling this might mean we can't mix junit w/ the tagging approach, and we have to go to a more standard directory / filenaming approach to separating out integration tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4746 make it easy to skip IntegrationTes...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4048#issuecomment-70292578 @pwendell I like the idea of just getting tests to run faster in general, but I think its gonna be hard to make that happen. (Not the most exciting tasks for beginners and I think the experienced folks have their hands full ...) This was meant to be an easy way to have some useful subset of tests that developers can run locally. Its great that we have jenkins running tests automatically, but I'd really like to improve the local development process. Even if we do speed up the other tests, I think there are always going to be some tests that fall into this bucket that we'd like to run, so I think this will still be useful. But I agree that we the tests could be sped up a lot -- I made some notes on the JIRA about how even after this, eliminating (or speeding up) a small number of tests would have huge gains. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5236] Fix ClassCastException in Specifi...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4039#issuecomment-70036427 btw, while you're mucking around in there ... it might be nice to change the `SpecificMutableRow` constructor to take varargs. Change this constructor: ``` - def this(dataTypes: Seq[DataType]) = + def this(dataTypes: DataType*) = ``` and then just get rid of the other constructor: ``` - def this() = this(Seq.empty) - ``` that way you don't have to wrap those types with a `Seq`, eg. instead of `new SpecificMutableRow(Seq(StringType [, IntType, ...]))` you can just do `new SpecificMutableRow(StringType [, IntType,...])` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-733] Add documentation on use of accumu...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4022#issuecomment-70036623 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5236] Fix ClassCastException in Specifi...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4039#issuecomment-70036236 I think finding fixing a bug in current behavior is a great reason to add a unit test. Some part of the implementation is confusing enough to have allowed a bug in the first place, a test will help prevent that bug from cropping up again in future changes. From the description you gave me, this seems like a minimal test case: ``` class SpecificMutableRowTest extends FunSuite with Matchers { test(update MutableAny) { val row = new SpecificMutableRow(Seq(StringType)) row.update(0, 1) row.getInt(0) should be (1) } } ``` I agree that this does seem kinda suspicious that maybe there is something deeper going on ... why is a field that is supposed to be an int getting assigned a type of `MutableAny` instead of `MutableInt`, though apparently `CatalystPrimitiveConverter` has decided to call the `int` specific methods like `setInt` etc.? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5236] Fix ClassCastException in Specifi...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4039#issuecomment-70037269 Back to the question of something deeper being wrong ... I think we'll need to wait for input from somebody more familiar w/ this code. @marmbrus ? But one helpful thing you could do would be to provide a minimal example that reproduced this. Eg., some tiny parquet file an example query maybe? [SPARK-5236](https://issues.apache.org/jira/browse/SPARK-5236) is really short on details. (sorry for splitting into so many comments, keep getting distracted ...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24642522 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -81,19 +84,38 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { */ def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) = blocks } + + /** + * Return the broadcast blocks stored in this block manager. + * + * Note that this is somewhat expensive, as it involves cloning the underlying maps and then + * concatenating them together. Much faster alternatives exist for common operations such as + * getting the memory, disk, and off-heap memory sizes occupied by this RDD. --- End diff -- should be broadcast variable instead of RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24642582 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -118,8 +140,20 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { } else { None } + case BroadcastBlockId(broadcastId, _) = +// Actually remove the block, if it exists +if (_broadcastBlocks.contains(broadcastId)) { + val removed = _broadcastBlocks(broadcastId).remove(blockId) + // If the given RDD has no more blocks left, remove the RDD --- End diff -- again, broadcast variable instead of RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24642955 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -118,8 +140,20 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { } else { None } + case BroadcastBlockId(broadcastId, _) = +// Actually remove the block, if it exists +if (_broadcastBlocks.contains(broadcastId)) { + val removed = _broadcastBlocks(broadcastId).remove(blockId) + // If the given RDD has no more blocks left, remove the RDD --- End diff -- actually now I am thinking there is a lot of copy-pasting that could be cleaned up. you could mostly merge this w/ the block above if you did something like: ``` case rddOrBlockId @ (_: BroadcastBlockId | _: RDDBlockId) = val (id, blockMap) = getIdAndBlockMap(rddOrBlockId) val removed = blockMap(id).remove(blockId) ... ``` where the helper function `getIdAndBlockMap` is something like: ``` def getIdAndBlockMap(blockId: BlockId): (Int, Map[BlockId, BlockStatus]) = blockId match { case RDDBlockId(rddId, _) = (rddId, _rddBlocks) case BroadcastBlockId(broadcastId, _) = (broadcastId, _broadcastBlocks) } ``` and then you could do a similar thing in a few other places. You could also take this a step further, and even merge `_rddBlocks` and `_broadcastBlocks` into a `EnumMap[BlockType, Map[BlockId, BlockStatus]]` if you made a new `public enum BlockType{ RDD,Broadcast}`, but that might not really help much since at the end of the day you do want separate getter methods for the RDD and Broadcast stuff --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24829369 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala --- @@ -24,7 +24,13 @@ import org.apache.spark.util.ListenerBus */ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { + private[spark] var filter: DefaultSparkListenerEventFilter = new DefaultSparkListenerEventFilter + override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { +if (!filter.validate(event)) { + return --- End diff -- I think the `DefaultSparkListenerEventFilter` is probably adding an abstraction without any really good reason. If we do stick w/ the new `SparkListenerBlockUpdate` events, I think its better if you just put the filter into the right method on the event logging listener and remove `DefaultSparkListenerEventFilter`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24831053 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -522,7 +523,9 @@ private[spark] class BlockManagerInfo( logInfo(Removed %s on %s on tachyon (size: %s).format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) } + return BlockStatus(storageLevel, 0, 0, 0) } +null --- End diff -- you don't need `return` here -- the last value of each block is its return value. so this could be: ``` if (storageLevel.isValid) { ... _blocks.get(blockId) } else if (_blocks.containsKey(blockId)) { ... BlockStatus(storageLevel, 0, 0, 0) } else { null } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24830276 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala --- @@ -24,7 +24,13 @@ import org.apache.spark.util.ListenerBus */ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { + private[spark] var filter: DefaultSparkListenerEventFilter = new DefaultSparkListenerEventFilter + override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { +if (!filter.validate(event)) { + return --- End diff -- also, I think there is still one missing piece to get the json into the event logging for the history server. You need to implement to put in the implementation for `onBlockUpdate` in `EventLoggingListener`. I am suggesting that you just put this filter into that implementation, and get rid of the `Filter` abstraction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24833405 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -49,9 +50,40 @@ class RDDInfo( } } + private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(rdd.id.toString) new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel) } } + +@DeveloperApi +class BroadcastInfo( +val id: Long, +val name: String, +val numPartitions: Int) extends Ordered[BroadcastInfo] with InMemoryObjectInfo { + + var memSize = 0L + var diskSize = 0L + var tachyonSize = 0L + + override def toString = { +import Utils.bytesToString +(%s\ (%d) ; + + MemorySize: %s; TachyonSize: %s; DiskSize: %s).format( +name, id, bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize)) + } + + override def compare(that: BroadcastInfo): Int = { +if (this.id that.id) { + 1 +} else { + if (this.id == that.id) { +return 0 + } + -1 --- End diff -- super minor: ``` if (this.id that.id) { 1 } else if (this.is == that.id) { 0 } else { -1 } ``` (sorry I was wrong w/ the earlier suggestion) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24833690 --- Diff: core/src/main/scala/org/apache/spark/ui/storage/InMemoryObjectPage.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.storage + +import javax.servlet.http.HttpServletRequest + +import org.apache.spark.storage._ +import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.util.Utils + +import scala.xml.Node + +private[ui] abstract class InMemoryObjectPage(pageName: String, parent: StorageTab) --- End diff -- RDD aren't necessarily in memory ... maybe a better name would be `StorageDetailPage`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24831483 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -522,7 +523,9 @@ private[spark] class BlockManagerInfo( logInfo(Removed %s on %s on tachyon (size: %s).format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) } + return BlockStatus(storageLevel, 0, 0, 0) } +null --- End diff -- actually, can you explain the `null` case? how does that happen, and won't that result in an NPE when it gets to your code in `StorageStatusListener` and in `JsonProtocol`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24833566 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -271,4 +368,19 @@ private[spark] object StorageUtils { blockLocations } + + /** + * Return a mapping from block ID to its locations for each block that belongs to the given RDD. --- End diff -- broadcast block, not RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24831712 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -21,13 +21,14 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +trait InMemoryObjectInfo --- End diff -- you don't really need this trait at all --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/2851#issuecomment-74698831 Hi @CodingCat thanks for making all the updates. Sorry I hadn't realized the subtlety w/ `Int` vs `Long` on the `RDDBlockId` and `BroadcastBlockId`. Still, I think its still a good change for simplifying the code, glad you figured a way to make it work. And on issue (2), the memory usage of a worker, I think what you have is correct, its supposed to be memory usage of the particular object on the worker. This is on a UI page in the context of a particular object -- there is a separate page to summarize the memory usage of the executor overall. (though I agree the UI is a little confusing ...) I'll make a few more minor comments on the code, but mostly I just want to get another opinion on the right events to pass the block added events around, as I have mentioned above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/2851#discussion_r24835658 --- Diff: core/src/main/scala/org/apache/spark/ui/storage/BroadcastPage.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.storage + +import org.apache.spark.storage.StorageUtils +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.Utils + +import scala.xml.Node + +private[ui] class BroadcastPage(parent: StorageTab) extends InMemoryObjectPage(broadcast, parent){ + + protected override val workerTableID: String = broadcast-storage-by-block-table + + protected override def objectList = listener.broadcastInfoList + + protected override def getBlockTableAndSize(objectId: Any): (Seq[Node], Int) = { +val blockLocations = StorageUtils.getBroadcastBlockLocation(objectId.asInstanceOf[Long], + storageStatusList) +val blocks = listener.storageStatusList + .flatMap(_.broadcastBlocksById(objectId.asInstanceOf[Long])) + .sortWith(_._1.name _._1.name) + .map { case (blockId, status) = + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String](Unknown))) +} +(UIUtils.listingTable(blockHeader, blockRow, blocks, id = Some(rdd-storage-by-block-table)), + blocks.size) + } + + protected override def generateContent(objectId: Long): (String, Seq[Node]) = { +val objectInfo = objectList.find(_.id == objectId).getOrElse { + // Rather than crashing, render an Not Found page + return (objectId.toString, nonFoundErrorInfo) +} +val (workerTable, workerCount) = getWorkerTableAndSize(objectId) + +val (blockTable, blockCount) = getBlockTableAndSize(objectId) + +val content = + div class=row-fluid +div class=span12 + ul class=unstyled +li + strongTotal Partitions:/strong + {objectInfo.numPartitions} --- End diff -- `numPartitions` doesn't mean anything for a Broadcast var. I think its always `0` in your code, unless I've missed something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/2851#issuecomment-74713997 can you also post a screenshot of the detailed page for a broadcast var? Ideally involving a broadcast var that gets turned into multiple blocks by `TorrentBroadcast`, I think just needs to be over 4MB by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5785] [PySpark] narrow dependency for c...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4629#discussion_r24852684 --- Diff: python/pyspark/tests.py --- @@ -740,6 +739,27 @@ def test_multiple_python_java_RDD_conversions(self): converted_rdd = RDD(data_python_rdd, self.sc) self.assertEqual(2, converted_rdd.count()) +def test_narrow_dependency_in_join(self): +rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x)) --- End diff -- do these tests actually check for a narrow dependency at all? I think they will pass even without it. I'm not sure of a better suggestion, though. I had to use `getNarrowDependencies` in another PR to check this: https://github.com/apache/spark/pull/4449/files#diff-4bc3643ce90b54113cad7104f91a075bR582 but I don't think that is even exposed in pyspark ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3957]: show broadcast variable resource...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/2851#issuecomment-74510287 well I was leaning towards using a `ThreadLocal` to get the broadcast blocks into the task end event ... but I forgot that broadcast blocks are also created by the driver (eg. [here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L98) or [here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala#L52)). It doesn't make any sense to stick those into a task end event. I suppose they could go into a task **start** event, but that is still a little weird, maybe we really do need a new event. @rxin @andrewor14 thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5785] [PySpark] narrow dependency for c...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4629#discussion_r24861247 --- Diff: python/pyspark/tests.py --- @@ -740,6 +739,27 @@ def test_multiple_python_java_RDD_conversions(self): converted_rdd = RDD(data_python_rdd, self.sc) self.assertEqual(2, converted_rdd.count()) +def test_narrow_dependency_in_join(self): +rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x)) --- End diff -- nice! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71577982 @koeninger I doubt that we want to go this route in this case, but just in case you're interested, I think a much better way to handle multiple errors gracefully is with [scalactic's `Or`](http://www.scalactic.org/user_guide/OrAndEvery). Its much better than `Either` for this case of building up a set of errors to report back to the user. And scalactic is a nicely designed, small library (eg. you're not pulling scalaz). Probably not worth it for this one case, but thought you might find it interesting :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add flag control overwrite r...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4175#issuecomment-71577489 I think these failures are real, looks like you need to do a similar updating of the args to `registerTempTable` in the pyspark tests, eg. [here](https://github.com/apache/spark/blob/9538bfe97eaa5e4cb4e812d5c52ee461151d878b/python/pyspark/tests.py#L863) there are more places too, look in the [test console output](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26119/console) for the full list --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23652781 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -113,12 +129,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis Logging directory specified is not a directory: %s.format(logDir)) } -checkForLogs() +// A task that periodically checks for event log updates on disk. +pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) -// Disable the background thread during tests. -if (!conf.contains(spark.testing)) { --- End diff -- I don't know why this was turned off for testing, but it doesn't seem like you want to change that behavior, do you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23652649 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -163,9 +179,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { -lastLogCheckTimeMs = getMonotonicTimeMs() -logDebug(Checking for logs. Time is now %d..format(lastLogCheckTimeMs)) --- End diff -- why are you removing the logging? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4220#issuecomment-71855275 retest this please hopefully those test failures were random, lets see. btw, I think that if you want the exact same patch applied to multiple branches, the standard practice is to just open one PR and make a comment that it should be backported to other branches. Its easy for committers to apply to multiple branches. Makes it easier to track the PRs. though this done mean we assume that if the test pass on master, they'll pass on other branches. (somebody correct me if I'm mistaken here ...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23703042 --- Diff: core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.mockito.Mockito._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. + */ +class OutputCommitCoordinatorSuite +extends FunSuite +with BeforeAndAfter +with LocalSparkContext +with Timeouts { + + val conf = new SparkConf().set(spark.localExecution.enabled, true) --- End diff -- this doesn't have any effect, does it? If I'm following things correctly, your call to `runJob` has `allowLocal = false`, which makes this config irrelevant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4220#issuecomment-71923273 I kinda see what is going on with the tests now. A [test case in SparkSubmitSuite](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala#L91) adds a non-existent jar to the system properties. That property never gets cleared, so later jobs which should submit successfully, are still trying to load that jar. I don't understand how your change is effecting this, though, or how this ever worked before. Maybe this change is exposing some lurking error -- something which just happened to work before. It seems like having multiple apps futzing w/ the system properties at the same time is bound to create problems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4216#issuecomment-71924525 I made a comment at one spot in the code, but throughout I find the name stable confusing. It implies the other one is unstable, and without the context from the JIRA, unstable to me means that it goes down randomly (not that it has binary compatibility issues across versions). also if we get rid of the old one, then having the only one be called stable will be even stranger. (Especially if somewhere down the road we find that the new one is unstable in some way.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4216#discussion_r23724419 --- Diff: core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import scala.collection.Map +import scala.collection.JavaConversions._ + +import org.json4s.jackson.JsonMethods._ +import org.json4s.JsonAST._ + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * A general message exchanged in the stable application submission REST protocol. + * + * The message is represented by a set of fields in the form of key value pairs. + * Each message must contain an ACTION field, which fully specifies the type of the message. + * For compatibility with older versions of Spark, existing fields must not be removed or + * modified, though new fields can be added as necessary. + */ +private[spark] abstract class SubmitRestProtocolMessage( +action: SubmitRestProtocolAction, +actionField: ActionField, +requiredFields: Seq[SubmitRestProtocolField]) { + + // Maintain the insert order for converting to JSON later + private val fields = new java.util.LinkedHashMap[SubmitRestProtocolField, String] + val className = Utils.getFormattedClassName(this) + + // Set the action field + fields.put(actionField, action.toString) + + /** Return all fields currently set in this message. */ + def getFields: Map[SubmitRestProtocolField, String] = fields.toMap --- End diff -- why not just use POJOs for each of the message, with normal fields, and just let jackson take care of the conversion to from json? Eg. `case class SubmitDriverMessage(action: Action, message: Option[String], ...)`. I don't see the advantage you get from putting everything into `fields` and doing your own manual type checking, maybe I am missing something fundamental? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4216#discussion_r23735817 --- Diff: core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import scala.collection.Map +import scala.collection.JavaConversions._ + +import org.json4s.jackson.JsonMethods._ +import org.json4s.JsonAST._ + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * A general message exchanged in the stable application submission REST protocol. + * + * The message is represented by a set of fields in the form of key value pairs. + * Each message must contain an ACTION field, which fully specifies the type of the message. + * For compatibility with older versions of Spark, existing fields must not be removed or + * modified, though new fields can be added as necessary. + */ +private[spark] abstract class SubmitRestProtocolMessage( +action: SubmitRestProtocolAction, +actionField: ActionField, +requiredFields: Seq[SubmitRestProtocolField]) { + + // Maintain the insert order for converting to JSON later + private val fields = new java.util.LinkedHashMap[SubmitRestProtocolField, String] + val className = Utils.getFormattedClassName(this) + + // Set the action field + fields.put(actionField, action.toString) + + /** Return all fields currently set in this message. */ + def getFields: Map[SubmitRestProtocolField, String] = fields.toMap --- End diff -- are you referring to using java serialization on case classes? Thats totally different from going to from json. Even if you need to use some more advanced api from jackson, eg. adding annotations or something, it still seems like it would make the code a lot simpler. I'm not familiar w/ jersey but sounds like it might be an even better alternative --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5324][SQL] Results of describe can't be...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4249#discussion_r23696141 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala --- @@ -63,6 +63,37 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } } + test(SPARK-5324 Results of describe can't be queried) { --- End diff -- nit: I'd prefer that the test name said what the *correct* behavior is, rather than the old bug it is checking for. I think its better when we're looking at test output, or if there is a regression on this and a dev needs to figure out what is being tests. Maybe I am wrong about this, though -- we haven't applied that universally so far --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5300 Add LocalFileSystem which will retu...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4204#discussion_r23694090 --- Diff: core/src/main/scala/org/apache/spark/storage/LocalFileSystem.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.fs.{LocalFileSystem = HadoopLocalFileSystem} +import org.apache.hadoop.fs.{Path, PathFilter, RemoteIterator, LocatedFileStatus} + +/** + * FileSystem for Spark that takes file ordering into account. This is because + * Hadoop MapReduce doesn't care about file ordering and HDFS may provide the + * file parts in order, but native filesystems may not. As Spark has a notion of + * ordering in RDDs (e.g. sortByKey), reading partitions out of order destroys + * these notions. + * + * We only need to override listLocatedStatus as this is called from + * FileInputFormat.singleThreadedListStatus and LocatedFileStatusFetcher. + */ +class LocalFileSystem extends HadoopLocalFileSystem { + override + def listLocatedStatus(path: Path) : RemoteIterator[LocatedFileStatus] = { +val listing = super.listLocatedStatus(path) +val builder = new ArrayBuffer[LocatedFileStatus]() +while(listing.hasNext) { + builder += listing.next +} +val sorted = builder.toArray.sortWith{ (lhs, rhs) = { --- End diff -- Its easy enough to go make an `Ordering[Path]`, given that its already `Comparable`. Just because the param *could* be an implicit doesn't mean it *has* to be: ``` scala val a = Array[Path]() a.sorted(new Ordering[Path]{def compare(x:Path, y:Path): Int = x.compareTo(y)}) res4: Array[org.apache.hadoop.fs.Path] = Array() ``` There is an automatic conversion from `Comparable[T]` to `Ordering[T]` -- I think the problem is that `Path` is un-parameterized `Comparable`: ``` scala abstract class Q extends Comparable[Q] defined class Q scala implicitly[Ordering[Q]] res0: Ordering[Q] = scala.math.LowPriorityOrderingImplicits$$anon$6@48a22925 ``` anyhow, probably doesn't effect the code much in this case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4220#discussion_r23776203 --- Diff: core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala --- @@ -42,7 +43,7 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su var oldProperties: Properties = null override def beforeEach(): Unit = { -oldProperties = new Properties(System.getProperties) +oldProperties = SerializationUtils.clone(System.getProperties) --- End diff -- thanks for tracking this down. can you please put a comment in here explaining why you need a clone? it is really subtle, I can easily see this getting reverted down the road if somebody doesn't know why its there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4220#issuecomment-72079687 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4220#issuecomment-72082093 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add `allowExisting` flag to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4271#issuecomment-72083678 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4216#discussion_r23875609 --- Diff: core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +/** + * A field used in [[SubmitRestProtocolMessage]]s. + */ +class SubmitRestProtocolField[T](val name: String) { --- End diff -- I think you don't need `name` anymore -- you end up needing to repeat the field name a lot, when now jackson is taking care of putting the field name in the json. Looks like its only used in `assertFieldIsSet`, which is only called from `DriverStatusRequest` -- so you could just pass in a message for that one case and dry up a lot of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4216#discussion_r23880057 --- Diff: core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty} +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.util.JsonProtocol + +/** + * A request to submit a driver in the REST application submission protocol. + */ +class SubmitDriverRequest extends SubmitRestProtocolRequest { + private val appName = new SubmitRestProtocolField[String](appName) + private val appResource = new SubmitRestProtocolField[String](appResource) + private val mainClass = new SubmitRestProtocolField[String](mainClass) + private val jars = new SubmitRestProtocolField[String](jars) + private val files = new SubmitRestProtocolField[String](files) + private val pyFiles = new SubmitRestProtocolField[String](pyFiles) + private val driverMemory = new SubmitRestProtocolField[String](driverMemory) + private val driverCores = new SubmitRestProtocolField[Int](driverCores) + private val driverExtraJavaOptions = new SubmitRestProtocolField[String](driverExtraJavaOptions) + private val driverExtraClassPath = new SubmitRestProtocolField[String](driverExtraClassPath) + private val driverExtraLibraryPath = new SubmitRestProtocolField[String](driverExtraLibraryPath) + private val superviseDriver = new SubmitRestProtocolField[Boolean](superviseDriver) + private val executorMemory = new SubmitRestProtocolField[String](executorMemory) + private val totalExecutorCores = new SubmitRestProtocolField[Int](totalExecutorCores) + + // Special fields + private val appArgs = new ArrayBuffer[String] + private val sparkProperties = new mutable.HashMap[String, String] + private val envVars = new mutable.HashMap[String, String] + + def getAppName: String = appName.toString + def getAppResource: String = appResource.toString + def getMainClass: String = mainClass.toString + def getJars: String = jars.toString + def getFiles: String = files.toString + def getPyFiles: String = pyFiles.toString + def getDriverMemory: String = driverMemory.toString + def getDriverCores: String = driverCores.toString + def getDriverExtraJavaOptions: String = driverExtraJavaOptions.toString + def getDriverExtraClassPath: String = driverExtraClassPath.toString + def getDriverExtraLibraryPath: String = driverExtraLibraryPath.toString + def getSuperviseDriver: String = superviseDriver.toString + def getExecutorMemory: String = executorMemory.toString + def getTotalExecutorCores: String = totalExecutorCores.toString + + // Special getters required for JSON serialization + @JsonProperty(appArgs) + private def getAppArgsJson: String = arrayToJson(getAppArgs) --- End diff -- I think you're still doing a lot more work to futz with the json than you to. I dug into a little bit, and I think the problem is that the json4s wrapper around jackson is lacking a lot of features. And to use jackson well, you need to add in the [scala module](http://search.maven.org/#artifactdetails%7Ccom.fasterxml.jackson.module%7Cjackson-module-scala_2.10%7C2.3.1%7Cbundle) (or just define these simple bean classes in java). Eg.: ``` class FooBar{ var blah: String = _ var ooga: Array[String] = _ var wakka: Map[String,String] = _ } val f = new FooBar f.blah = hi f.ooga = Array(flim, flam, floop) f.wakka = Map(bip - bop, beep - boop) val mapper = new ObjectMapper() mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) println(mapper.writeValueAsString(f)) ``` prints out `{blah:hi,ooga:[flim,flam,floop
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4187#discussion_r23501217 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala --- @@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. + * Supports dictionaries to reduce Binary to String conversion overhead. + * + * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) + extends CatalystPrimitiveConverter(parent, fieldIndex) { + + private var dict: Array[String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary):Unit = +dict = (for (i - 0 to dictionary.getMaxId) --- End diff -- random esoteric scala detail: `foreach` on a `Range` is actually just about as good as a while loop. this **only** applies to `Range`, though, nothing else. more [here](https://github.com/scala/scala/commit/4cfc633fc6). I stumbled upon this a while back, if you are interested in reading more: http://imranrashid.com/posts/scala-loop-profiling/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add flag control overwrite r...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4175#issuecomment-71348013 Jenkins this is OK to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4187#discussion_r23501242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala --- @@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. + * Supports dictionaries to reduce Binary to String conversion overhead. + * + * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) + extends CatalystPrimitiveConverter(parent, fieldIndex) { + + private var dict: Array[String] = null --- End diff -- I have a hard time believing this really matters (eg., if its actually hot, then the JIT will take care of it), but if we do care about this, should we recommend that people *always* use `private[this]`?? I have a hard time imaginging that anyone will ever need `private` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add flag control overwrite r...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4175#issuecomment-71348345 lets try this again ... Jenkins this is OK to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add flag control overwrite r...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4175#issuecomment-71348093 this is mentioned in the jira, but its worth noting again here that this changes the behavior slightly, since it wouldn't throw an exception before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4187#discussion_r23505185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala --- @@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. + * Supports dictionaries to reduce Binary to String conversion overhead. + * + * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) + extends CatalystPrimitiveConverter(parent, fieldIndex) { + + private var dict: Array[String] = null --- End diff -- thanks @rxin and @markhamstra . good point about `equals`, but I will try to use `private[this]` most of the time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4187#issuecomment-71323376 Jenkins this is ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4187#issuecomment-71323709 Thanks @MickDavies ! thanks for investigating and also putting the performance comparison into the jira. I think the code looks fine, but I'm not super-familiar w/ this part of the code so I'd like to get an OK from somebody else. (I'm assuming the tests will pass ...) Out of my own curiosity -- does the dictionary effect the amount of memory needed when reading writing parquet? Was parquet creating this dictionary in any case, so we always had it sitting around in memory?? Or does this change mean that now we're storing more in memory than before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Add support for dictionaries...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4187#discussion_r23504700 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala --- @@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. + * Supports dictionaries to reduce Binary to String conversion overhead. + * + * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) + extends CatalystPrimitiveConverter(parent, fieldIndex) { + + private var dict: Array[String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary):Unit = +dict = (for (i - 0 to dictionary.getMaxId) --- End diff -- oh right, sorry I missed the `yield`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] SPARK-5309: Use Dictionary for Binary-S...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4139#issuecomment-71303531 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23569836 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD, KafkaRDDPartition} +import org.apache.spark.rdd.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int = 1 +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = new DeterministicKafkaInputDStreamCheckpointData --- End diff -- looks like this line is too long ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3298][SQL] Add flag control overwrite r...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4175#issuecomment-71551408 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23570025 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag]( } /** + * Applies a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def foreachPartitionWithIndex( --- End diff -- I disagree -- I think it should stay here. It seems like a pretty obvious omission from the RDD api. Probably its never been added b/c there wasn't a good use case. Well, now we've got a good use case. IMO no sense in creating another PR that needs review just for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71555904 I'm not very knowledgeable about streaming, but from my limited perspective it looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71546698 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5388] Provide a stable application subm...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4216#discussion_r23709166 --- Diff: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala --- @@ -148,15 +148,22 @@ private[deploy] object DeployMessages { // Master to MasterWebUI - case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], -activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], -activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], -status: MasterState) { + case class MasterStateResponse( + host: String, + port: Int, + stablePort: Option[Int], + workers: Array[WorkerInfo], + activeApps: Array[ApplicationInfo], + completedApps: Array[ApplicationInfo], + activeDrivers: Array[DriverInfo], + completedDrivers: Array[DriverInfo], + status: MasterState) { Utils.checkHost(host, Required hostname) assert (port 0) def uri = spark:// + host + : + port +def stableUri: Option[String] = stablePort.map { p = spark:// + host + : + p } --- End diff -- I'm not sure where the right place is, but can you put in some docs or comments on `port` vs `stablePort` `uri` vs `stableUri`? or should `stableXXX` be renamed to `restXXX`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5425: Use synchronised methods in system...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4220#issuecomment-71884820 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5291][CORE] Add timestamp and reason wh...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4082#discussion_r23708804 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala --- @@ -369,7 +369,7 @@ private[spark] class MesosSchedulerBackend( private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { inClassLoader() { logInfo(Mesos slave lost: + slaveId.getValue) - removeExecutor(slaveId.getValue) + removeExecutor(slaveId.getValue, reason.toString) --- End diff -- does mesos ever actually give us a useful reason? though it would be nice to have an actual reason for the executor being remove, it seems we'll end up with the reason for the executor removed event is just Executor Lost, which is pretty useless. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org