[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-189151430 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-189151425 **[Test build #52033 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52033/consoleFull)** for PR 11249 at commit [`0725def`](https://github.com/apache/spark/commit/0725def770316bc1331c7e65cc300cfe0f5edd33). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-189151432 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/52033/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-189151197 **[Test build #52033 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52033/consoleFull)** for PR 11249 at commit [`0725def`](https://github.com/apache/spark/commit/0725def770316bc1331c7e65cc300cfe0f5edd33). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54212560 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,33 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, Accumulable[_, _]] = +mutable.Map.empty + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from --- End diff -- OK, and there are still many counter cases in the same file, and I will change them all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170956 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/AccumulableCheckpoint.scala --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.{Accumulable, AccumulableParam, SparkException} + +/** + * Store information of Accumulable. We can't checkpoint Accumulable dircectly because the + * "readObject" method of Accumulable to add extra logic. + */ +class AccumulableCheckpoint[R, T] private ( + val name: String, + val value: R, + val param: AccumulableParam[R, T]) extends Serializable{ --- End diff -- Space before `{` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170867 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,33 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, Accumulable[_, _]] = +mutable.Map.empty + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from --- End diff -- We don't do alignment for scala doc in Spark. ```scala @param initialValue Initial value of accumulator. It will be ignored when recovering from.. @param name The name is required as identity to find corresponding accumulator. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170612 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,33 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, Accumulable[_, _]] = +mutable.Map.empty + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from +* checkpoint +* @param name name is required as identity to find corresponding accumulator. +*/ + def getOrCreateRecoverableAccumulator[T](initialValue: T, name: String) +(implicit param: AccumulatorParam[T]): Accumulator[T] = { + +def registerNewAccumulator(_initialV: T) : Accumulator[T] = { + val acc = sc.accumulator(_initialV, name) + recoverableAccuNameToAcc(name) = acc + acc +} + +val newInitialValue: T = if (isCheckpointPresent) { + _cp.trackedAccs.find(_.name == name).map(_.value).getOrElse(initialValue).asInstanceOf[T] +} else initialValue --- End diff -- Nit: ```scala if (...) { ... } else { ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53925855 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- the exception is about the `ssc` (StreamingContext), probably the property of `recoverableAccuNameToId`; in the meantime, the calculation of `val trackedAccs = ` should happens in master node, not in every executor, I doubt this exception was caused by something else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-187582252 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-187582244 **[Test build #51746 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51746/consoleFull)** for PR 11249 at commit [`cca11ba`](https://github.com/apache/spark/commit/cca11ba6fe0be4e01e7539ee0ebd9f9fbd283320). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-187582255 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51746/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-187581494 **[Test build #51746 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51746/consoleFull)** for PR 11249 at commit [`cca11ba`](https://github.com/apache/spark/commit/cca11ba6fe0be4e01e7539ee0ebd9f9fbd283320). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53742303 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- The extra logic in Accumulable.readObject is coupled with a branch of other codes, I am not sure I want to touch them here. And actually It's a common pattern to extract necessary parts from object and recover from the subset. For example: class `Checkpoint` do the same thing to "StreamingContext". Moreover, accumulator id is not assignable but it increases automatically when create a new one, which means the recovered Accumulator is not 100% same as original one, at least they have different accumulator id. So I would prefer "AccumulatorCheckpoint" which already contains enough information for recovering. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53741073 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- `ssc`could be used directly, but could not in `scala.Function1` as input of `filter` method, otherwise it need to be serialized. if I change change the code to ```scala val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => ssc.recoverableAccuNameToId.values.toSet.contains(ele._1)).map{ case (id, weakRef) => new AccumulableCheckpoint(weakRef.get.get) }.toArray ``` and got below exception: > java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.streaming.StreamingContext Serialization stack: - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@381bf51d) - field (class: org.apache.spark.streaming.Checkpoint, name: org$apache$spark$streaming$Checkpoint$$ssc, type: class org.apache.spark.streaming.StreamingContext) - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@622b6f7) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:570) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-186051202 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-186051203 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51511/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-186051114 **[Test build #51511 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51511/consoleFull)** for PR 11249 at commit [`368dc0f`](https://github.com/apache/spark/commit/368dc0f70277188d2d43c21640cda3e32edca052). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419256 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- Yes it will add the strong references, that's why I said we need to reset the `StreamingContext.recoverableAccuNameToId` in `stop / shutdown` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419167 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- Hmmm, I don't think so, `ssc.` is used everywhere in this class. can you try it? probably the exception you met was lead by something else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419010 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- OK, I see, let's see if we can do something inside of `Accumulable.readObject`, currently it's more like a hacky way to work around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53418748 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- That's a good point, I will change as your suggestion. It add a strong reference to the Accumulator and avoid removing by GC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53418037 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- `readObject` is called by Java serialization automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417927 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- I may not describe it clearly, sorry. It used to be inside of `val trackedAccs = ...`, but it caused serialization issue: it try to serializate StreamingContext when doing checkpoint. So I move it out of `trackedAccs ` definition explicitly to break the dependency of `ssc`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417873 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- I don't think the memory space is the problem, as in Java collection always stores object references, not the real duplicated objects. The reason I suggested to do that is to simplify the code at https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R47 , which mean we don't need to get the Accumulator again by accumulator ids, particularly the `Accumulators.originals` stores the `WeakReference` object, it can return null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417573 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- I just check the code of `Accumulable.readObject()`, seems never used. were you meeting problem with that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417343 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- I am not talking about the `@transient`, I mean we can move this inside of https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R48 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-186022669 **[Test build #51511 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51511/consoleFull)** for PR 11249 at commit [`368dc0f`](https://github.com/apache/spark/commit/368dc0f70277188d2d43c21640cda3e32edca052). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53410683 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { +if (isCheckpointPresent) { + // accumulators created by StreamingContext must provide name, so it's safe to call name.get + mutable.Map(_cp.trackedAccs.map(accCP => (accCP.name, -1L)).toSeq: _*) +} else { + mutable.Map.empty +} + } + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from +* checkpoint +* @param name name is required as identity to find corresponding accumulator. +*/ + def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) --- End diff -- hmm... change func name might me a good idea to explicitly emphasize the recoverable feature. I will change it. About the input parameter, I don't think `createFunc `here is necessary, `initialValue` should be enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53409985 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- All `Accumulator[_,_]` has been restored in `Accumulators.original` map, I don't want to duplicate them in memory. There are two disadvantage: 1. extra effort to keep consistency between two maps (2) extra memory space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53409427 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- No, `readObject` method of Accumulable is overrided, and some "hacky" code is added: the value of Accumulable will explicitly be set to 0 when deserializing. That's why we need this class. I also put the reason on comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53408959 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- No, I make it explicitly be a `val` to avoid serialization issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53400290 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- `mutable.Map[String, Accumulator[_,_]]`? and reset the `Map` while shutdown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53399663 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- I am wondering if we really need the `AccumulableCheckpoint` here, can it be type of `Array[Accumulable[_, _]]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53397245 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- Can this not be property? Seems only used once in the `val trackedAccs: ...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53397085 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { +if (isCheckpointPresent) { + // accumulators created by StreamingContext must provide name, so it's safe to call name.get + mutable.Map(_cp.trackedAccs.map(accCP => (accCP.name, -1L)).toSeq: _*) +} else { + mutable.Map.empty +} + } + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from +* checkpoint +* @param name name is required as identity to find corresponding accumulator. +*/ + def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) --- End diff -- getOrCreateRecoverableAccumulator[T](createFunc: () => T, name: String)(...)? Which also explicitly to tell developer that only accumulators created via this API can be recoverable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-185658066 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-185658072 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51473/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-185657505 **[Test build #51473 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51473/consoleFull)** for PR 11249 at commit [`0c7e5d7`](https://github.com/apache/spark/commit/0c7e5d71f6c77a225f7409730df3c9ffdef611d3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11249#issuecomment-185605826 **[Test build #51473 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51473/consoleFull)** for PR 11249 at commit [`0c7e5d7`](https://github.com/apache/spark/commit/0c7e5d71f6c77a225f7409730df3c9ffdef611d3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
GitHub user mwws opened a pull request: https://github.com/apache/spark/pull/11249 [Spark-13374][Streaming][wip] make it possible to create recoverable accumulator for streaming application You can merge this pull request into a Git repository by running: $ git pull https://github.com/mwws/spark SPARK-Accumulator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11249 commit f226750866b79096876314bc673101f37abcfae8 Author: mwwsDate: 2016-02-14T08:55:14Z initial checkin for checkpointing accumulator commit 0c7e5d71f6c77a225f7409730df3c9ffdef611d3 Author: mwws Date: 2016-02-18T08:29:36Z make it possible to create recoverable accumulator for streaming application --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org