[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22275938 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -35,6 +35,15 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { }) --- End diff -- nit: The existing version of registerGauge could have used the new version. Not a big deal, very small amount of duplicate code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22275955 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -55,19 +64,31 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastCompletedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastCompletedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption(lastCompletedBatch_processingDelay, +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_schedulingDelay, +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_totalDelay, +_.lastCompletedBatch.flatMap(_.totalDelay), -1L) // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastReceivedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastReceivedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last received batch records and total received batch records. + registerGauge(lastReceivedBatchRecords, _.lastReceivedBatchRecords.values.sum, 0L) --- End diff -- Isnt it more consistent to name this `lastReceivedBatch_records`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22275969 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -55,19 +64,31 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastCompletedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastCompletedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption(lastCompletedBatch_processingDelay, +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_schedulingDelay, +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_totalDelay, +_.lastCompletedBatch.flatMap(_.totalDelay), -1L) // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastReceivedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastReceivedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last received batch records and total received batch records. + registerGauge(lastReceivedBatchRecords, _.lastReceivedBatchRecords.values.sum, 0L) + registerGauge(totalReceivedBatchRecords, _.numTotalReceivedBatchRecords, 0L) --- End diff -- Since this is more related to the global streaming metrics like `totalCompletedBatches`, it might be more consistent to put these near them and naming it `totalReceivedRecords` (please update the corresponding field in the listener as well if you change this). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22275976 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -55,19 +64,31 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastCompletedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastCompletedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastCompletedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastCompletedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last completed batch's delay information. + registerGaugeWithOption(lastCompletedBatch_processingDelay, +_.lastCompletedBatch.flatMap(_.processingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_schedulingDelay, +_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) + registerGaugeWithOption(lastCompletedBatch_totalDelay, +_.lastCompletedBatch.flatMap(_.totalDelay), -1L) // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. - registerGauge(lastReceivedBatch_submissionTime, -_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processStartTime, -_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge(lastReceivedBatch_processEndTime, -_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + registerGaugeWithOption(lastReceivedBatch_submissionTime, +_.lastCompletedBatch.map(_.submissionTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingStartTime, +_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGaugeWithOption(lastReceivedBatch_processingEndTime, +_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) + + // Gauge for last received batch records and total received batch records. + registerGauge(lastReceivedBatchRecords, _.lastReceivedBatchRecords.values.sum, 0L) + registerGauge(totalReceivedBatchRecords, _.numTotalReceivedBatchRecords, 0L) --- End diff -- And if its not too much work, could you add `totalProcessedRecords`? That seems useful. If it is too complicated then dont worry about it for this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68115893 Just a couple of more comments for making the name more consistent with existing ones. Otherwise I approve of the how the `registerGauge` works now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68119029 [Test build #24827 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24827/consoleFull) for PR 3466 at commit [`00f5f7f`](https://github.com/apache/spark/commit/00f5f7f17b7aee95b519e17fb79e522277af8474). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68119033 Hi TD, thanks a lot for your comments, I just change the code style as you suggested, also add one more metrics `totalProcessedRecords`, would you mind reviewing this again? Thanks a lot and appreciate your time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68121187 LGTM. Merging this. Thanks @jerryshao --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3466 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68121546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24827/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68121543 [Test build #24827 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24827/consoleFull) for PR 3466 at commit [`00f5f7f`](https://github.com/apache/spark/commit/00f5f7f17b7aee95b519e17fb79e522277af8474). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22266250 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- I understand the problem. Good catch, I did not realize that. How about this. Lets make two versions of registerGauge, one that takes `f: StreamingProgressListener = T` without any default value, another that takes `f: StreamingProgressListener = Option[T]` and the default value. Each version will be used accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22267558 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- OK, got it, I will change the code as you suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68087844 [Test build #24808 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24808/consoleFull) for PR 3466 at commit [`44721a6`](https://github.com/apache/spark/commit/44721a601f2d6c5b028de88c775106d5d32bf998). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68090373 [Test build #24808 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24808/consoleFull) for PR 3466 at commit [`44721a6`](https://github.com/apache/spark/commit/44721a601f2d6c5b028de88c775106d5d32bf998). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68090374 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24808/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22241115 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { registerGauge(lastCompletedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + // Gauge for last completed batch's delay information. + registerGauge(lastCompletedBatch_processTime, --- End diff -- Its better to name this processingTime --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22241227 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { registerGauge(lastCompletedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + // Gauge for last completed batch's delay information. + registerGauge(lastCompletedBatch_processTime, +_.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L) --- End diff -- Why is the default value different within getOrElse, and the explicit default value? In fact the default value should not have to be specified twice. Since the registerGauge function already takes care of the nulls using Option and default value, we should not require Options here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22241231 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { registerGauge(lastCompletedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + // Gauge for last completed batch's delay information. + registerGauge(lastCompletedBatch_processTime, +_.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L) + registerGauge(lastCompletedBatch_schedulingDelay, +_.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L), -1L) --- End diff -- Same comment as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22241246 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -70,4 +78,8 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) --- End diff -- I know that this is old code, but could you fix this as well? Default value should not have to specified twice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68015296 Thanks a lot TD for your comments, I will factor out the old code and fix the above issues you mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68017979 [Test build #24751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24751/consoleFull) for PR 3466 at commit [`c097ddc`](https://github.com/apache/spark/commit/c097ddcb963bff611221ebc74da4e1f82c87b024). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68018004 Hey TD, I've addressed the problem you mentioned in this way, I'm not is this what you want, would you mind taking a look at it ? Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22243982 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- I think its better to keep the Option here (and document that `defaultValue` is used when `f` returns `null`. And other places should not have to use Option. This is safer for any one to use and also minimizes the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22244053 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- OK, I will revert it back and try a better way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22245026 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- Hi TD, what's your meaning of `And other places should not have to use Option`, If here as an example, change to ```scala registerGauge(lastCompletedBatch_submissionTime, _.lastCompletedBatch.map(_.submissionTime).get, -1L) ``` `get` will throw exception when there's no completed batch. I'm not sure what's actual meaning, sorry if I misunderstand anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68023279 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24751/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-68023274 [Test build #24751 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24751/consoleFull) for PR 3466 at commit [`c097ddc`](https://github.com/apache/spark/commit/c097ddcb963bff611221ebc74da4e1f82c87b024). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r22246102 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener = T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener = Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name(streaming, name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) --- End diff -- Hi TD, sorry to bother you again, I'm not if there's a better way to address this problem, would you mind giving me some hints, thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user maasg commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r20924293 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -70,4 +78,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) registerGauge(lastReceivedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + + // Gauge for last received batch records and total received batch records. + private var totalReceivedBatchRecords: Long = 0L + def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = { +totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum --- End diff -- Will this counter work? I think that gauges are collected only on request of a source, so if nobody is consuming the metric or consuming it too often, we will have a wrong count. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r20931831 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -70,4 +78,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) registerGauge(lastReceivedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + + // Gauge for last received batch records and total received batch records. + private var totalReceivedBatchRecords: Long = 0L + def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = { +totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum --- End diff -- Aha, you're right, so it is hard to collect the total batch records without modifying the `StreamingJobProgressListener` code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/3466#discussion_r20932324 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala --- @@ -70,4 +78,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) registerGauge(lastReceivedBatch_processEndTime, _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + + // Gauge for last received batch records and total received batch records. + private var totalReceivedBatchRecords: Long = 0L + def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = { +totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum --- End diff -- I will fix this, thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64749125 [Test build #23913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23913/consoleFull) for PR 3466 at commit [`02dd44f`](https://github.com/apache/spark/commit/02dd44fbfcf1e771c4263c519b06177ffb73121f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64753876 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23913/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64753870 [Test build #23913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23913/consoleFull) for PR 3466 at commit [`02dd44f`](https://github.com/apache/spark/commit/02dd44fbfcf1e771c4263c519b06177ffb73121f). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/3466 [SPARK-4537][Streaming] Expand StreamingSource to add more metrics Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-4537 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3466.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3466 commit c7a93762cd96581271b6cc4951abe5a92952b5e2 Author: jerryshao saisai.s...@intel.com Date: 2014-11-26T01:51:54Z Expand StreamingSource to add more metrics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64509682 [Test build #23868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23868/consoleFull) for PR 3466 at commit [`c7a9376`](https://github.com/apache/spark/commit/c7a93762cd96581271b6cc4951abe5a92952b5e2). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64514695 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23868/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4537][Streaming] Expand StreamingSource...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3466#issuecomment-64514692 [Test build #23868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23868/consoleFull) for PR 3466 at commit [`c7a9376`](https://github.com/apache/spark/commit/c7a93762cd96581271b6cc4951abe5a92952b5e2). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org