[GitHub] spark pull request #9875: [SPARK-11662] [YARN]. In Client mode, make sure we...
Github user harishreedharan closed the pull request at: https://github.com/apache/spark/pull/9875 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11672#issuecomment-196555837 Thanks @rxin. Opened [SPARK-11806](https://issues.apache.org/jira/browse/SPARK-11806) to discuss this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11672#issuecomment-196553283 Hi @zsxwing, I think the discussion on supporting Kafka 0.9 should happen **if** we decide to keep Kafka in Spark itself. At this point, I think the piece that benefits the most out of moving out of Spark is the kafka integration - since that is the one where more of the API compatibility issues are. I really think we should discuss moving Kafka out and come to an agreement on that as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11672#issuecomment-196115717 I agree with @ksakellis on this one. It would be great if we can pull Kafka out as well. I understand that there are a lot of users who might find it difficult, but if you think about it, most people use the plugins via mvn anyway (since we don't actually package them in our assembly). I am not sure what the policy is if we pull it into a different repo and if we can keep the same groupId and artifactId, but that could be an alternative and most likely will not break too many users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11358#issuecomment-189425272 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11358#issuecomment-188621725 This looks like it might affect HDFS tokens as well and error that looks like this might come up during the initial token renewal: ``` WARN UserGroupInformation: PriviledgedActionException as:hari (auth:PROXY) via hdfs@EXAMPLE (auth:KERBEROS) cause:org.apache.hadoop.security.AccessControlException: hari tries to renew a token with renewer hdfs ``` In addition to the code that gets the new tokens, I think the [`getTokenRenewalInterval`](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L580) method also needs to be be run as the real user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/11358#issuecomment-188615270 So I have not tested using the keytab-based login with proxy user stuff at all. We get delegation tokens even there - does this issue affect that as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51798611 --- Diff: external/kafka-newapi/pom.xml --- @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.0.0-SNAPSHOT +../../pom.xml + + + org.apache.spark + spark-streaming-kafka-newapi_2.11 --- End diff -- Is it possible to name this module something else? `newapi` is pretty generic - and really refers to kafka's new API while the name seems to suggest it is some new API is Spark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51799340 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/DirectKafkaInputDStream.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.Logging +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration parameters. + *Requires "metadata.broker.list" or "bootstrap.servers" to be set + *with Kafka broker(s), + *NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + *starting point of the stream + */ +private[streaming] +class DirectKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +@transient val fromOffsets: Map[TopicPartition, Long], +messageHandler: ConsumerRecord[K, V] => R + ) extends InputDStream[R](ssc_) with Logging { + + val maxRetries = context.sparkContext.getConf.getInt( +"spark.streaming.kafka.maxRetries", 1) + + // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") + private[streaming] override def name: String = s"Kafka 0.9 direct stream [$id]" + + protected[streaming] override val checkpointData = +new DirectKafkaInputDStreamCheckpointData + + + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override protected[streaming] val rateController: Option[RateController] = { +if (RateController.isBackPressureEnabled(ssc.conf)) { + Some(new DirectKafkaRateController(id, +RateEstimator.create(ssc.conf, ssc_.graph.batchDuration))) --- End diff -- Why use both `ssc` and `ssc_`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/10953#issuecomment-179511363 I have a more fundamental question - given that this patch does not add a whole lot of new functionality but just ports the currently functionality to use the new Kafka API, is there a reason we can't refactor the current `KafkaRDD` and `*KafkaDStream` classes such that the functionality accessing the Kafka API is wrapped in methods(say `fetchFromKafka` or something), but everything else (rate limiting, checkpointing, producing new RDDs) etc is "outside". That way the new API-use can be simply in overridden methods (`override def fetchFromKafka`). Also, can we change the package name and module names to something other than `newapi` - I mean `new` is relative - Kafka could end up having a newer API at some point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9875#issuecomment-159085964 Also, to be clear..it would work fine in Cluster mode. In Client mode, #7394 should have taken care of the long-running app issue (though there was one where @SaintBacchus I think mentioned that the `EventLoggingListener` was not able to write to HDFS anymore, even though the tokens were getting updated. So in either case, I am wondering whether in client mode, we should simply re-login using keytab and not bother with tokens on the driver app at all (so the AM would login, update tokens etc, while the client app always just logs in). So you think that make sense, @tgravescs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9875#issuecomment-158988545 Yes, that is correct but this happens even without the tokens expiring. What do you think about doing the relogin in the YarnCLientSchedulerBackend? This is messy with client mode but I don't think we have another option though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9837#discussion_r45494051 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala --- @@ -166,7 +168,11 @@ private[hive] class ClientWrapper( " specified in spark.yarn.keytab does not exist") } else { logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFileName}") +s" using principal: ${principalName} and keytab: ${keytabFileName}") +val hadoopConfiguration = new Configuration() + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, --- End diff -- Ah, ok. Did you hit this issue when running an app? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9859#issuecomment-158498642 LGTM. Are there any other configs required? I remember Hadoop security had a bunch of configs. /cc @tgravescs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9635#issuecomment-158561381 I don't think this PR actually fixes the issue. I think the real issue is that once tokens are added to the credentials, Hadoop does not allow the user to get new tokens. So we basically need to login as a different user, get new credentials and add them to the current user's credentials. I opened #9875 to fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9875#issuecomment-158563127 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...
GitHub user harishreedharan opened a pull request: https://github.com/apache/spark/pull/9875 [SPARK-11662] [YARN]. In Client mode, make sure we re-login before at⦠â¦tempting to create new delegation tokens if a new SparkContext is created within the same application. Since Hadoop gives precedence to the delegation tokens, we must make sure we login as a different user, get new tokens and replace the old ones in the current user's credentials cache to avoid not being able to get new ones. /cc @tedyu @tgravescs You can merge this pull request into a Git repository by running: $ git pull https://github.com/harishreedharan/spark keytab-relogin-restarted-context Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9875 commit 70f610f855c3fd2b3ea9b0a44f6015162f555f87 Author: Hari Shreedharan <hshreedha...@apache.org> Date: 2015-11-20T23:42:01Z [SPARK-11662] [YARN]. In Client mode, make sure we re-login before attempting to create new delegation tokens if a new SparkContext is created within the same application. Since Hadoop gives precedence to the delegation tokens, we must make sure we login as a different user, get new tokens and replace the old ones in the current user's credentials cache to avoid not being able to get new ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9875#issuecomment-158573009 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9837#discussion_r45425265 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala --- @@ -166,7 +168,11 @@ private[hive] class ClientWrapper( " specified in spark.yarn.keytab does not exist") } else { logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFileName}") +s" using principal: ${principalName} and keytab: ${keytabFileName}") +val hadoopConfiguration = new Configuration() + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, --- End diff -- This seems wrong. If you are talking to hadoop cluster that is secure, your client configuration should already have this. Setting this here alone is not going to help, since all other instances used all over Spark will not have this information (where required). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9635#issuecomment-158250401 Hmm, why does stopping the context actually remove the kerberos login, unless the token renewal interval has passed? The patch looks fine, but have you confirmed if this is an actual problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9635#issuecomment-158255349 It is not really the token that is causing the issue. It looks like UGI of the current user has expired kerberos tickets. Anyway, this patch does not make things worse, I am just unsure it actually fixes the issue at hand. I am not really sure what the issue is - within the same process this should not happen for a long time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...
Github user harishreedharan closed the pull request at: https://github.com/apache/spark/pull/2994 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/2994#issuecomment-157908818 I am closing this PR. I have put this up here: https://github.com/cloudera/spark-kafka-writer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9707#discussion_r45098059 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -187,16 +187,27 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ + @volatile private var latestCheckpointTime: Time = null + class CheckpointWriteHandler( checkpointTime: Time, bytes: Array[Byte], clearCheckpointDataLater: Boolean) extends Runnable { def run() { + if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { +latestCheckpointTime = checkpointTime + } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") - val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) - val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + // We will do checkpoint when generating a batch and completing a batch. When the processing + // time of a batch is greater than the batch interval, checkpointing for completing an old + // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old + // batch actually has the latest information, so we want to recovery from it. Therefore, we + // also use the latest checkpoint time as the file name, so that we can recovery from the + // latest checkpoint file. + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) --- End diff -- So the idea here is that if an older batch's completion-checkpoint comes in after a new batch's initial-checkpoint, we overwrite the initial checkpoint (since we would not reset the `latestCheckpointTime`)? This actually could essentially mean two checkpoints being written to the same files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9707#discussion_r45099938 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -187,16 +187,27 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ + @volatile private var latestCheckpointTime: Time = null + class CheckpointWriteHandler( checkpointTime: Time, bytes: Array[Byte], clearCheckpointDataLater: Boolean) extends Runnable { def run() { + if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { +latestCheckpointTime = checkpointTime + } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") - val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) - val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + // We will do checkpoint when generating a batch and completing a batch. When the processing + // time of a batch is greater than the batch interval, checkpointing for completing an old + // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old + // batch actually has the latest information, so we want to recovery from it. Therefore, we + // also use the latest checkpoint time as the file name, so that we can recovery from the + // latest checkpoint file. + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) --- End diff -- I don't think you get what I am saying. I am saying that two threads could run at the same time writing out data to the exact same files. If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at the same time, writing to the same files. -- Checkpoint 1: Completion of Batch Time t -- Checkpoint 2: Start of Batch Time t+1 Checkpoint 2 starts -> `latestCheckpoint = t + 1` Checkpoint 1 starts -> since `latestCheckpoint != null` and `latestCheckpoint > checkpointTime`, we would not reset `latestCheckpoint`, so both checkpoints would use the same file name to write their checkpoints out. Because of this, depending on which thread reaches the tempFile creation first, that would win - which is non-deterministic. The other thread would end up hitting an exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9707#discussion_r45102167 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -187,16 +187,27 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ + @volatile private var latestCheckpointTime: Time = null + class CheckpointWriteHandler( checkpointTime: Time, bytes: Array[Byte], clearCheckpointDataLater: Boolean) extends Runnable { def run() { + if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { +latestCheckpointTime = checkpointTime + } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") - val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) - val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + // We will do checkpoint when generating a batch and completing a batch. When the processing + // time of a batch is greater than the batch interval, checkpointing for completing an old + // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old + // batch actually has the latest information, so we want to recovery from it. Therefore, we + // also use the latest checkpoint time as the file name, so that we can recovery from the + // latest checkpoint file. + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) --- End diff -- Ok, then we are fine. Can you put in a comment where the executor is being created, so we don't end up causing a bug due to this class not being thread-safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9707#issuecomment-157469418 LGTM. Thanks @zsxwing ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11731][STREAMING] Enable batching on Dr...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9695#issuecomment-156583976 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9373#issuecomment-155257362 @brkyvz Sounds good, sir. I think the issue you saw seems to be a protobuf incompatibility issue - did you compile and run against the same hadoop-2 version (2.2+ ?) This patch now LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44185733 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala --- @@ -488,7 +491,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => -context.reply(addBlock(receivedBlockInfo)) +if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { + val f = Future(addBlock(receivedBlockInfo))(walBatchingThreadPool) + f.onComplete(result => context.reply(result.get))(walBatchingThreadPool) --- End diff -- Actually would it even run? Since the threadpool is now dead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44057628 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava --- End diff -- `flatMap` does not, but the `asScala` and `asJava` calls are allocating new objects. They maybe lazily collecting the objects being iterated over, but they are still creating two new objects no? In a loop, the only new object being created is the list itself. The others are basically references to already created objects returned from the `deaggregate()` call. It might be more code, but I never found loops hard to read - sometimes I find loops clearer. I don't feel that strongly about it - I just feel that a couple of scala to java (and vice versa) conversions just for a `flatMap()` call is not really worth it. Being Young-gen GC they are not that bad, I guess. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not work
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44084903 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { +while (!walWrit
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44085335 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { +while (!walWrit
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44085133 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { +while (!walWrit
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44069967 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) --- End diff -- +1 for `wrappedLog` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44070214 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { --- End diff -- Can
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44069852 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala --- @@ -157,9 +166,12 @@ private[streaming] class ReceivedBlockTracker( require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) -writeToLog(BatchCleanupEvent(timesToCleanup)) -timeToAllocatedBlocks --= timesToCleanup -writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) +if (writeToLog(BatchCleanupEvent(timesToCleanup))) { + timeToAllocatedBlocks --= timesToCleanup + writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) --- End diff -- Ah, right. For whatever reason I got confused. Ignore my comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11457][Streaming][YARN] Fix incorrect A...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9412#issuecomment-154256737 Where is this being set into the SparkConf for a normal app? I am not sure of the parameters, but setting the new values looks good (not sure if `spark.ui.filters` and the other params) are the right ones. @vanzin - if they are, this LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43963503 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava --- End diff -- This is really unnecessary. We should really not have to convert to a scala iterator, just to do a flatMap and then back to a Java one. Just having a loop adding all of the records map is not that horrible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43964017 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { +while (!walWrit
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43964535 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava --- End diff -- I really don't think we are dealing with all that much memory. I feel we are over-using the conversions here with very little additional benefit. All we are doing is a flatMap. Moreover, the flatMap is going to allocate a new collection/seq - so a loop is probably going to use less memory, because you are adding only the newly generated `ByteBuffers`. There are no new objects allocated other than the ones required (in the last comment when I said records, I meant the Buffers that were created as a result of the deaggregation, not all of the data from the `readAll`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastru
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43964698 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() + + private val batchedWriterThread = startBatchedWriterThread() + + /** + * Write a byte buffer to the log file. This method adds the byteBuffer to a queue and blocks + * until the record is properly written by the parent. + */ + override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { +val promise = Promise[WriteAheadLogRecordHandle]() +walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise)) +try { + Await.result(promise.future.recover { case _ => null }(ThreadUtils.sameThread), +WAL_WRITE_STATUS_TIMEOUT.milliseconds) +} catch { + case e: TimeoutException => +logWarning(s"Write to Write Ahead Log promise timed out after " + + s"$WAL_WRITE_STATUS_TIMEOUT millis for record.") +null +} + } + + /** + * Read a segment from an existing Write Ahead Log. The data may be aggregated, and the user + * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]] + * + * This method is handled by the parent WriteAheadLog. + */ + override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { +parent.read(segment) + } + + /** + * Read all the existing logs from the log directory. + * + * This method is handled by the parent WriteAheadLog. + */ + override def readAll(): JIterator[ByteBuffer] = { +parent.readAll().asScala.flatMap(deaggregate).asJava + } + + /** + * Delete the log files that are older than the threshold time. + * + * This method is handled by the parent WriteAheadLog. + */ + override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { +parent.clean(threshTime, waitForCompletion) + } + + + /** + * Stop the batched writer thread, fulfill promises with failures and close parent writer. + */ + override def close(): Unit = { +logInfo("BatchedWriteAheadLog shutting down.") +active = false +batchedWriterThread.interrupt() +batchedWriterThread.join() +fulfillPromises() +parent.close() + } + + /** + * Respond to any promises that may have been left in the queue, to unblock receivers during + * shutdown. + */ + private def fulfillPromises(): Unit = { +while (!walWrit
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43963794 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingQueue, TimeoutException} +import java.util.{Iterator => JIterator} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Promise} +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.apache.spark.Logging +import org.apache.spark.util.{Utils, ThreadUtils} + +/** + * A wrapper for a WriteAheadLog that batches records before writing data. All other methods will + * be passed on to the wrapped class. + * + * Parent exposed for testing. + */ +private[streaming] class BatchedWriteAheadLog(private[util] val parent: WriteAheadLog) + extends WriteAheadLog with Logging { + + import BatchedWriteAheadLog._ + + // exposed for tests + protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]() + + private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds + + // Whether the writer thread is active + @volatile private var active: Boolean = true + protected val buffer = new ArrayBuffer[RecordBuffer]() --- End diff -- It looks like there are a bunch of `asJava` calls on this buffer. Why not just use a Java List instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43961907 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala --- @@ -157,9 +166,12 @@ private[streaming] class ReceivedBlockTracker( require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) -writeToLog(BatchCleanupEvent(timesToCleanup)) -timeToAllocatedBlocks --= timesToCleanup -writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) +if (writeToLog(BatchCleanupEvent(timesToCleanup))) { + timeToAllocatedBlocks --= timesToCleanup + writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) --- End diff -- This is not actually new, but I think we should clean first and then remove the times from the `timeToAllocatedBlocks` map. Otherwise, failed cleanups can lead to the logs never getting cleaned up at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r43962050 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala --- @@ -439,6 +439,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private val submitJobThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("submit-job-thead-pool")) +private val walBatchingThreadPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("wal-batching-thead-pool")) --- End diff -- "thread-pool" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9373#issuecomment-152893856 Did you try HDFS? I am assuming we'd get similar speed ups there too but in that case there are far fewer files in which case the cost to setup the streams are paid only a handful of times. What I am wondering is if we'd actually ever have to deal with that many files in the non-S3 case. This adds the additional cost for HDFS or any other FS, no? In those cases the number of files usually would be pretty small, which may result in this being more expensive. If this adds only a small cost or if it becomes faster, then let's keep this. On Sunday, November 1, 2015, Burak Yavuz <notificati...@github.com <javascript:_e(%7B%7D,'cvml','notificati...@github.com');>> wrote: > @harishreedharan <https://github.com/harishreedharan> Here are some > benchmark results: > For reference, the driver was a r3.2xlarge EC2 instance. > > [image: image] > <https://cloud.githubusercontent.com/assets/5243515/10871515/54c14846-809e-11e5-91e6-2ac3605d98b7.png> > Num Threads Rate (ms / file) Speed-up 50 5.556101934 9.004997951 25 > 5.99898194 8.340196225 8 8.692144733 5.756080699 4 14.1162362 3.544336169 > 1 50.03268653 1 > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/9373#issuecomment-152867985>. > -- Thanks, Hari --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9373#discussion_r43578480 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala --- @@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog( val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) -logFilesToRead.iterator.map { file => +logFilesToRead.par.map { file => --- End diff -- This is an expensive operation - you'd end up running an O(n) operation to create a copy (in addition to the copy cost). Do we really need this? I am not entirely sure the copying adds a whole lot of value, considering that this array is not going to be very huge. Also note the additional cost to spin up threads (if the context does not already have them spun up). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11424] Guard against double-close() of ...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9382#issuecomment-152637344 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11324][STREAMING] Flag for closing Writ...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9285#issuecomment-151282234 If there is no other option, this LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11324] Flag for closing Write Ahead Log...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9285#issuecomment-151281257 Wouldn't this be really expensive? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11324][STREAMING] Flag for closing Writ...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9285#discussion_r43071619 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala --- @@ -39,6 +39,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { val DEFAULT_ROLLING_INTERVAL_SECS = 60 val DEFAULT_MAX_FAILURES = 3 + val WAL_CLOSE_AFTER_WRITE = "spark.streaming.writeAheadLog.closeAfterWrite" --- End diff -- Wouldn't the config in that case be something like `spark.streaming.driver.writeAheadLog.closeFileAfterWrite` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9174#issuecomment-149442366 /cc @tdas @tgravescs @vanzin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...
GitHub user harishreedharan opened a pull request: https://github.com/apache/spark/pull/9174 [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE is set before SparkHad⦠â¦oopUtil.get is called. If `StreamingContext.getOrCreate` is used in `yarn-client` mode, a `ClassCastException` gets thrown when `Client.scala` is initialized and `YarnSparkHadoopUtil.get` is called. This fails because the `SparkHadoopUtil.get` method is called in the `getOrCreate` method default argument, before SPARK_YARN_MODE is set. You can merge this pull request into a Git repository by running: $ git pull https://github.com/harishreedharan/spark streaming-yarn-client Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9174.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9174 commit 4a48bf02706e703fca6a4047f9b68ba67f816706 Author: Hari Shreedharan <hshreedha...@apache.org> Date: 2015-10-20T05:58:11Z [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE is set before SparkHadoopUtil.get is called. If `StreamingContext.getOrCreate` is used in `yarn-client` mode, a `ClassCastException` gets thrown when `Client.scala` is initialized and `YarnSparkHadoopUtil.get` is called. This fails because the `SparkHadoopUtil.get` method is called in the `getOrCreate` method default argument, before SPARK_YARN_MODE is set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149442481 Agreed - synchronization is painful and we could end up missing events. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9174#issuecomment-149651474 Looks like SPARK-10812 would actually take care of this - since the check happens in run time. +1 on backporting that to 1.5 branch. I will close this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149666851 So this is my theory (I don't have anything to back this up really). My assumption is based on the fact that if we don't set `hadoop.fs.hdfs.impl.disable.cache=true`, then the update of tokens seems to fail on the cached `FileSystem` instance (we needed to add that config in a PR at some point to ensure the update of tokens worked). So if that config needed to be set so the `FileSystem.get()` method to work, it likely means (again my theory), that a FileSystem object created using older tokens does not seem to know about the new ones. I can't be sure of this but that could explain why even updating the tokens locally using the `ExecutorDelegationTokenRenewer` does not fix the event log writes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...
Github user harishreedharan closed the pull request at: https://github.com/apache/spark/pull/9174 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9174#discussion_r42516999 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1025,9 +1025,6 @@ object Client extends Logging { "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") } -// Set an env variable indicating we are running in YARN mode. -// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes -System.setProperty("SPARK_YARN_MODE", "true") --- End diff -- Ok, to be safe I will revert the last commit where I removed the other places where it was being set and add a comment there On Tuesday, October 20, 2015, Imran Rashid <notificati...@github.com> wrote: > In yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala > <https://github.com/apache/spark/pull/9174#discussion_r42515407>: > > > @@ -1025,9 +1025,6 @@ object Client extends Logging { > > "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") > > } > > > > -// Set an env variable indicating we are running in YARN mode. > > -// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes > > -System.setProperty("SPARK_YARN_MODE", "true") > > to @tgravescs <https://github.com/tgravescs> point, would it hurt to > leave this in here? and then change the comment to indicate that it should > normally be set by SparkSubmit but left here in case anybody is still using > this. > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/9174/files#r42515407>. > -- Thanks, Hari --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149787894 If the current user's ugi is what is used by the FileSystem cache, this should not really be an issue no? Because we actually do update the current user's credentials. I am ok with doing something like this, but I'd rather know why before adding this. Why would new tokens not work? That seems like an HDFS issue no? Let me test this out with @SaintBacchus's other PR. In client mode, that would really mean that tokens are used by executors and keytab used by AM and the driver. I am in half a mind to suggest not supporting long-running apps in client mode on secure HDFS. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9168#discussion_r42421617 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala --- @@ -177,6 +177,7 @@ private[yarn] class AMDelegationTokenRenewer( }) // Add the temp credentials back to the original ones. UserGroupInformation.getCurrentUser.addCredentials(tempCreds) +SparkHadoopUtil.get.updateCurrentUserHDFSDelegationToken() --- End diff -- So I don't understand this. We are already getting new tokens for what I assume is the primary namenode using the `obtainTokensForNamenodes` method call, correct? Is this only to get the tokens for the standby? I don't think this will work though - You'd need to do the same thing we do for the primary (add the tokens to a new UGI and copy the credentials from there). From what I can remember, when you add new tokens for a service that already has tokens - the tokens won't get overwritten, even if they are expired - which is why we went with creating a new UGI and adding those credentials over (which would overwrite the old tokens). Are you sure the `HAUtil.cloneDelegationTokenForLogicalUri` will overwrite the tokens? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149344858 Hmm, I think the real issue is that the event logging does not doAs. I think in `yarn-cluster`, since the SparkContext is created in the AM, the updated credentials actually are in the cache of the user writing to the event logs (since we are already running as that user and don't do a doAs). In `yarn-client` though, because we don'd do a `doAs` - is it possible that the new tokens are not being used to write to the event log? @SaintBacchus Let me open a PR that does the doAs and combine it with your previous one #8867 and can you test it and see if it works? Or you can do it yourself - just add a `doAs` here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L66 , https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L143 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L48 (basically anywhere HDFS is accessed) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9168#issuecomment-149338159 /cc @tgravescs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9168#discussion_r42420896 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -130,6 +132,20 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + def updateCurrentUserHDFSDelegationToken(): Unit = { +val conf = new Configuration() +val nsId = DFSUtil.getNamenodeNameServiceId(conf) +val isHaEnabled = HAUtil.isHAEnabled(conf, nsId) + +if(isHaEnabled){ + val ugi = UserGroupInformation.getCurrentUser + val uri = FileSystem.getDefaultUri(conf) + val map = DFSUtil.getHaNnRpcAddresses(conf) --- End diff -- @steveloughran If this is not a good option, is there another way of doing this? Maybe look up a specific configuration? Considering the fact that the configuration params are not likely to change (since it would break compat), we could just directly use those? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8867#issuecomment-149372740 OK, I think I know the issue - the reason is probably that the credentials are cached in the `FileSystem` instance using which the write happens. Since we are replacing the credentials but not the `FileSystem` instance itself this might not work, which is why #8942 works. We can do with either that approach or we can replace the `FileSystem` instance which would require a close and reopen of the file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149440116 Here it is: OK, I think I know the issue - the reason is probably that the credentials are cached in the FileSystem instance using which the write happens. Since we are replacing the credentials but not the FileSystem instance itself this might not work, which is why #8942 works. We can do with either that approach or we can replace the FileSystem instance which would require a close and reopen of the file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-149439989 Actually that is not right..I posted an explanation on your other PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11109] [CORE] Move FsHistoryProvider of...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9144#issuecomment-148717590 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11109] [CORE] Move FsHistoryProvider of...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9144#issuecomment-148563344 Does this class exist in older versions of Hadoop, like 1.1 etc? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9043#discussion_r41785632 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString) + override def getConfig(): Map[String, String] = { +val safeMode = if (isFsInSafeMode()) { + Map("HDFS State" -> "In safe mode.") --- End diff -- OK. In that case, could you make this message a bit more detailed. Something like HDFS is in safe mode, so history server will be unavailable or something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9043#issuecomment-147552208 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9043#discussion_r41669645 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NOT_STARTED = "" + // Interval between safemode checks. + private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds( +"spark.history.fs.safemodeCheck.interval", "5s") --- End diff -- Do we use camel-case in other configs? If we do, then ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9043#discussion_r41670365 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2, + * so we have to resort to ugly reflection (as usual...). + * + * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons + * makes it more public than not. + */ + private[history] def isFsInSafeMode(): Boolean = { +if (!fs.isInstanceOf[DistributedFileSystem]) { + return false +} +isFsInSafeMode(fs.asInstanceOf[DistributedFileSystem]) + } + + // For testing. + private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { +val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction" +val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction" +val actionClass: Class[_] = + try { +getClass().getClassLoader().loadClass(hadoop2Class) + } catch { +case _: ClassNotFoundException => + getClass().getClassLoader().loadClass(hadoop1Class) + } + +val action = actionClass.getField("SAFEMODE_GET").get(null) +val method = dfs.getClass().getMethod("setSafeMode", action.getClass()) +method.invoke(dfs, action).asInstanceOf[Boolean] --- End diff -- I am assuming this piece of code is correct. I have no idea why a set method returns a `Boolean`, but who am I to argue with the Hadoop API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/9043#discussion_r41670523 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString) + override def getConfig(): Map[String, String] = { +val safeMode = if (isFsInSafeMode()) { + Map("HDFS State" -> "In safe mode.") --- End diff -- This is only for testing I assume. Can you please add a comment stating that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9043#issuecomment-146970575 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...
GitHub user harishreedharan opened a pull request: https://github.com/apache/spark/pull/9041 [SPARK-11019][streaming][flume] Gracefully shutdown Flume receiver th⦠â¦reads. Wait for a minute for the receiver threads to shutdown before interrupting them. You can merge this pull request into a Git repository by running: $ git pull https://github.com/harishreedharan/spark flume-graceful-shutdown Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9041.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9041 commit dba200fc912ae7b29b8082dd77490bcb504dcf74 Author: Hari Shreedharan <hshreedha...@apache.org> Date: 2015-10-09T00:04:42Z [SPARK-11019][streaming][flume] Gracefully shutdown Flume receiver threads. Wait for a minute for the receiver threads to shutdown before interrupting them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9041#issuecomment-146730016 This basically makes testing a bit more predictable. Sometimes, we end up hitting a situation where the last transaction is still not completed and the remaining data just stays stuck in Flume. Also, it removes some unneeded `InterruptedException` from the logs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9041#issuecomment-146724990 /cc @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9041#issuecomment-146730211 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9041#issuecomment-146730398 In most cases, less than 5-10 seconds - just for the last in-flight batch to be done. This is not going to slow our unit test runs down. I mostly hit it in our internal end-to-end tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10987] [yarn] Workaround for missing ne...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/9021#issuecomment-146420692 LGTM. This though worries me that there are other similar bugs lurking in the background. I would expect the new RPC to behave exactly like the old one (minus any akka-related issues we saw). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-145968060 Actually looking at this again, I think our only option is to log a message. It is possible that the `SparkContext` was already created and passed to us, in which case we can't do anything about dynamic allocation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
GitHub user harishreedharan opened a pull request: https://github.com/apache/spark/pull/8998 [streaming] SPARK-10955. Disable dynamic allocation for Streaming app⦠â¦lications. Dynamic allocation can be painful for streaming apps and can lose data. The one drawback though is that apps which run Streaming and non-streaming form the same context will also end up not being able to do dynamic allocation. Another option would be to log a warning. You can merge this pull request into a Git repository by running: $ git pull https://github.com/harishreedharan/spark ss-log-error Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8998.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8998 commit a4a521273114d3ac485592f5c4408adcc11bfc45 Author: Hari Shreedharan <hshreedha...@apache.org> Date: 2015-10-06T18:58:39Z [streaming] SPARK-10955. Disable dynamic allocation for Streaming applications. Dynamic allocation can be painful for streaming apps and can lose data. The one drawback though is that apps which run Streaming and non-streaming form the same context will also end up not being able to do dynamic allocation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-145966395 /cc @vanzin @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-145996934 Added config parameter to enable it if the user really wants to enable dynamic allocation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-146012980 I don't have a strong preference for the config name. @vanzin, @andrewor14 - like the current name or the one which @markgrover suggested? Vote please :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-146017855 As I said, I don't mind either. But in this case, we need to be conservative. We can additional checks to see if WAL is enabled, but it is not possible to actually check what DStreams are being used (it could be a custom one, which does not break if executors go away), so that case still exists. How about we print the config param out or document it in that case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10812] [yarn] Fix shutdown of token ren...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8996#issuecomment-145951831 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10955][streaming] Disable dynamic alloc...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8998#issuecomment-146033142 @tdas What do you think about the above? If you still think we should just make it a warn, I will make the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10916] Set perm gen size when launching...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8970#issuecomment-145689866 LGTM too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-144268884 Why would #8867 not be sufficient?It looks like that should be enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8942#issuecomment-144273051 Hmm, this might be due to the cached token being missed? So it looks like the token got replaced alright, but it seems like the file could not be written with the new token? @tgravescs might know more about this. I am not sure why this would cause an issue. It looks like the new token cannot be used to write an old file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/8867#discussion_r40708505 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -544,6 +545,7 @@ private[spark] class Client( logInfo(s"Credentials file set to: $credentialsFile") val renewalInterval = getTokenRenewalInterval(stagingDirPath) sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString) + SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(sparkConf) --- End diff -- This depends on the lines immediately above this though. `setupCredentials` gets called way before this I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/8867#discussion_r40705866 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala --- @@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater( SparkHadoopUtil.get.getTimeFromNowToRenewal( sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) if (timeFromNowToRenewal <= 0) { -executorUpdaterRunnable.run() +// If the `timeFromNowToRenewal` equals 0, it's better to wait 1 minutes to schedule for +// avoid cycle calling and cause StackOverflow Exception. --- End diff -- We end up scheduling the next run of the `executorUpdaterRunnable` from within this class. If the executor renewal interval is set to 0, we end up calling it again from within the same thread (indefinitely). But I am not sure why it would be 0 here though (since this is started only if the login is from keytab). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/2994#issuecomment-143944627 I will get back to this one in a week or so --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8867#issuecomment-143322902 @SaintBacchus Right. That is what I was talking about above. I think your fix takes care of this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8867#issuecomment-143012421 @tgravescs So seems like this would fix that issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8881#issuecomment-142656310 Looking at it again, @srowen is right. Returning `None` makes `getOrCompute` think that no RDDs have been generated for a given time (artifact of the fact that `None` is returned when a map has no value for a key). So this really is case of returning an empty RDD from your function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8881#issuecomment-142520996 LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/8881#discussion_r40177949 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala --- @@ -210,6 +210,18 @@ class BasicOperationsSuite extends TestSuiteBase { input.map(_.map(_.toString)) ) } + + test("transform with NULL") { +val input = Seq(1 to 4, Seq(), 5 to 8, 9 to 12) +testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => {if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null}), // RDD.map in transform + input.filter(!_.isEmpty).map(_.map(_.toString)), + 4, + false +) + } --- End diff -- nit: The formatting here is pretty weird. Maybe take the transformation method out and separate it out as a `def`, so this method call can be in a single line: ``` def transformMethod(r: DStream[Int]): Option[RDD] = { r.transform { rdd => if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null } } testOperation(input, transformMethod, input.filter(!_.isEmpty).map(_.map(_.toString)), 4, false) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-10692][Streaming] Expose failureReasons...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/8892#issuecomment-142781672 Is this for a different branch? Why a new PR for the same thing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org