[GitHub] spark pull request #9875: [SPARK-11662] [YARN]. In Client mode, make sure we...

2016-07-12 Thread harishreedharan
Github user harishreedharan closed the pull request at:

https://github.com/apache/spark/pull/9875


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...

2016-03-14 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11672#issuecomment-196555837
  
Thanks @rxin. Opened 
[SPARK-11806](https://issues.apache.org/jira/browse/SPARK-11806) to discuss this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...

2016-03-14 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11672#issuecomment-196553283
  
Hi @zsxwing, I think the discussion on supporting Kafka 0.9 should happen 
**if** we decide to keep Kafka in Spark itself. 
At this point, I think the piece that benefits the most out of moving out 
of Spark is the kafka integration - since that is the one where more of the API 
compatibility issues are.

I really think we should discuss moving Kafka out and come to an agreement 
on that as well.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13843][Streaming]Remove streaming-flume...

2016-03-13 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11672#issuecomment-196115717
  
I agree with @ksakellis on this one. It would be great if we can pull Kafka 
out as well. I understand that there are a lot of users who might find it 
difficult, but if you think about it, most people use the plugins via mvn 
anyway (since we don't actually package them in our assembly). I am not sure 
what the policy is if we pull it into a different repo and if we can keep the 
same groupId and artifactId, but that could be an alternative and most likely 
will not break too many users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...

2016-02-26 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11358#issuecomment-189425272
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...

2016-02-24 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11358#issuecomment-188621725
  
This looks like it might affect HDFS tokens as well and error that looks 
like this might come up during the initial token renewal:
```
WARN UserGroupInformation: PriviledgedActionException as:hari (auth:PROXY) 
via hdfs@EXAMPLE (auth:KERBEROS) 
cause:org.apache.hadoop.security.AccessControlException: hari tries to renew a 
token with renewer hdfs
```

In addition to the code that gets the new tokens, I think the 
[`getTokenRenewalInterval`](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L580)
 method also needs to be be run as the real user. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13478] [yarn] Use real user when fetchi...

2016-02-24 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/11358#issuecomment-188615270
  
So I have not tested using the keytab-based login with proxy user stuff at 
all. We get delegation tokens even there - does this issue affect that as well? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-03 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51798611
  
--- Diff: external/kafka-newapi/pom.xml ---
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.spark
+spark-parent_2.11
+2.0.0-SNAPSHOT
+../../pom.xml
+  
+
+  org.apache.spark
+  spark-streaming-kafka-newapi_2.11
--- End diff --

Is it possible to name this module something else? `newapi` is pretty 
generic - and really refers to kafka's new API while the name seems to suggest 
it is some new API is Spark


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-03 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51799340
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ *
+ * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ *configuration parameters.
+ *Requires "metadata.broker.list" or 
"bootstrap.servers" to be set
+ *with Kafka broker(s),
+ *NOT zookeeper servers, specified in 
host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+ *starting point of the stream
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+@transient val fromOffsets: Map[TopicPartition, Long],
+messageHandler: ConsumerRecord[K, V] => R
+  ) extends InputDStream[R](ssc_) with Logging {
+
+  val maxRetries = context.sparkContext.getConf.getInt(
+"spark.streaming.kafka.maxRetries", 1)
+
+  // Keep this consistent with how other streams are named (e.g. "Flume 
polling stream [2]")
+  private[streaming] override def name: String = s"Kafka 0.9 direct stream 
[$id]"
+
+  protected[streaming] override val checkpointData =
+new DirectKafkaInputDStreamCheckpointData
+
+
+  /**
+   * Asynchronously maintains & sends new rate limits to the receiver 
through the receiver tracker.
+   */
+  override protected[streaming] val rateController: Option[RateController] 
= {
+if (RateController.isBackPressureEnabled(ssc.conf)) {
+  Some(new DirectKafkaRateController(id,
+RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
--- End diff --

Why use both `ssc` and `ssc_`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-03 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/10953#issuecomment-179511363
  
I have a more fundamental question - given that this patch does not add a 
whole lot of new functionality but just ports the currently functionality to 
use the new Kafka API, is there a reason we can't refactor the current 
`KafkaRDD` and `*KafkaDStream` classes such that the functionality accessing 
the Kafka API is wrapped in methods(say `fetchFromKafka` or something), but 
everything else (rate limiting, checkpointing, producing new RDDs) etc is 
"outside". That way the new API-use can be simply in overridden methods 
(`override def fetchFromKafka`). 

Also, can we change the package name and module names to something other 
than `newapi` - I mean `new` is relative - Kafka could end up having a newer 
API at some point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...

2015-11-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9875#issuecomment-159085964
  
Also, to be clear..it would work fine in Cluster mode. In Client mode,  
#7394 should have taken care of the long-running app issue (though there was 
one where @SaintBacchus I think mentioned that the `EventLoggingListener` was 
not able to write to HDFS anymore, even though the tokens were getting updated. 

So in either case, I am wondering whether in client mode, we should simply 
re-login using keytab and not bother with tokens on the driver app at all (so 
the AM would login, update tokens etc, while the client app always just logs 
in). So you think that make sense, @tgravescs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...

2015-11-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9875#issuecomment-158988545
  
Yes, that is correct but this happens even without the tokens expiring. 

What do you think about doing the relogin in the 
YarnCLientSchedulerBackend? This is messy with client mode but I don't think we 
have another option though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...

2015-11-20 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9837#discussion_r45494051
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala ---
@@ -166,7 +168,11 @@ private[hive] class ClientWrapper(
   " specified in spark.yarn.keytab does not exist")
   } else {
 logInfo("Attempting to login to Kerberos" +
-  s" using principal: ${principalName} and keytab: 
${keytabFileName}")
+s" using principal: ${principalName} and keytab: 
${keytabFileName}")
+val hadoopConfiguration = new Configuration()
+
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
--- End diff --

Ah, ok. Did you hit this issue when running an app?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...

2015-11-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9859#issuecomment-158498642
  
LGTM. Are there any other configs required? I remember Hadoop security had 
a bunch of configs.

/cc @tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...

2015-11-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9635#issuecomment-158561381
  
I don't think this PR actually fixes the issue. I think the real issue is 
that once tokens are added to the credentials, Hadoop does not allow the user 
to get new tokens. So we basically need to login as a different user, get new 
credentials and add them to the current user's credentials. 

I opened #9875 to fix this. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...

2015-11-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9875#issuecomment-158563127
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...

2015-11-20 Thread harishreedharan
GitHub user harishreedharan opened a pull request:

https://github.com/apache/spark/pull/9875

[SPARK-11662] [YARN]. In Client mode, make sure we re-login before at…

…tempting to create new delegation tokens if a new SparkContext is 
created within the same application.

Since Hadoop gives precedence to the delegation tokens, we must make sure 
we login as a different user, get new tokens and replace the old ones in the 
current user's credentials cache to avoid not being able to get new ones.

/cc @tedyu @tgravescs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/harishreedharan/spark 
keytab-relogin-restarted-context

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9875


commit 70f610f855c3fd2b3ea9b0a44f6015162f555f87
Author: Hari Shreedharan <hshreedha...@apache.org>
Date:   2015-11-20T23:42:01Z

[SPARK-11662] [YARN]. In Client mode, make sure we re-login before 
attempting to create new delegation tokens if a new SparkContext is created 
within the same application.

Since Hadoop gives precedence to the delegation tokens, we must make sure 
we login as a different user, get new tokens and replace the old ones in the 
current user's credentials cache to avoid not being able to get new ones.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] [YARN]. In Client mode, make sur...

2015-11-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9875#issuecomment-158573009
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11821] Propagate Kerberos keytab for al...

2015-11-19 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9837#discussion_r45425265
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala ---
@@ -166,7 +168,11 @@ private[hive] class ClientWrapper(
   " specified in spark.yarn.keytab does not exist")
   } else {
 logInfo("Attempting to login to Kerberos" +
-  s" using principal: ${principalName} and keytab: 
${keytabFileName}")
+s" using principal: ${principalName} and keytab: 
${keytabFileName}")
+val hadoopConfiguration = new Configuration()
+
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
--- End diff --

This seems wrong. If you are talking to hadoop cluster that is secure, your 
client configuration should already have this. Setting this here alone is not 
going to help, since all other instances used all over Spark will not have this 
information (where required).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...

2015-11-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9635#issuecomment-158250401
  
Hmm, why does stopping the context actually remove the kerberos login, 
unless the token renewal interval has passed? The patch looks fine, but have 
you confirmed if this is an actual problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11662] Call startExecutorDelegationToke...

2015-11-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9635#issuecomment-158255349
  
It is not really the token that is causing the issue. It looks like UGI of 
the current user has expired kerberos tickets. Anyway, this patch does not make 
things worse, I am just unsure it actually fixes the issue at hand. I am not 
really sure what the issue is - within the same process this should not happen 
for a long time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

2015-11-18 Thread harishreedharan
Github user harishreedharan closed the pull request at:

https://github.com/apache/spark/pull/2994


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

2015-11-18 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2994#issuecomment-157908818
  
I am closing this PR. I have put this up here: 
https://github.com/cloudera/spark-kafka-writer



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...

2015-11-17 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9707#discussion_r45098059
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -187,16 +187,27 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
+  @volatile private var latestCheckpointTime: Time = null
+
   class CheckpointWriteHandler(
   checkpointTime: Time,
   bytes: Array[Byte],
   clearCheckpointDataLater: Boolean) extends Runnable {
 def run() {
+  if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
+latestCheckpointTime = checkpointTime
+  }
   var attempts = 0
   val startTime = System.currentTimeMillis()
   val tempFile = new Path(checkpointDir, "temp")
-  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
checkpointTime)
-  val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
checkpointTime)
+  // We will do checkpoint when generating a batch and completing a 
batch. When the processing
+  // time of a batch is greater than the batch interval, checkpointing 
for completing an old
+  // batch may run after checkpointing of a new batch. If this 
happens, checkpoint of an old
+  // batch actually has the latest information, so we want to recovery 
from it. Therefore, we
+  // also use the latest checkpoint time as the file name, so that we 
can recovery from the
+  // latest checkpoint file.
+  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
latestCheckpointTime)
--- End diff --

So the idea here is that if an older batch's completion-checkpoint comes in 
after a new batch's initial-checkpoint, we overwrite the initial checkpoint 
(since we would not reset the `latestCheckpointTime`)? 

This actually could essentially mean two checkpoints being written to the 
same files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...

2015-11-17 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9707#discussion_r45099938
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -187,16 +187,27 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
+  @volatile private var latestCheckpointTime: Time = null
+
   class CheckpointWriteHandler(
   checkpointTime: Time,
   bytes: Array[Byte],
   clearCheckpointDataLater: Boolean) extends Runnable {
 def run() {
+  if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
+latestCheckpointTime = checkpointTime
+  }
   var attempts = 0
   val startTime = System.currentTimeMillis()
   val tempFile = new Path(checkpointDir, "temp")
-  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
checkpointTime)
-  val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
checkpointTime)
+  // We will do checkpoint when generating a batch and completing a 
batch. When the processing
+  // time of a batch is greater than the batch interval, checkpointing 
for completing an old
+  // batch may run after checkpointing of a new batch. If this 
happens, checkpoint of an old
+  // batch actually has the latest information, so we want to recovery 
from it. Therefore, we
+  // also use the latest checkpoint time as the file name, so that we 
can recovery from the
+  // latest checkpoint file.
+  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
latestCheckpointTime)
--- End diff --

I don't think you get what I am saying. I am saying that two threads could 
run at the same time writing out data to the exact same files. 

If I am not mistaken, there is a bug here that could lead to 2 checkpoints 
running at the same time, writing to the same files.
-- Checkpoint 1: Completion of Batch Time t
-- Checkpoint 2: Start of Batch Time t+1

Checkpoint 2 starts -> `latestCheckpoint = t + 1`
Checkpoint 1 starts -> since `latestCheckpoint != null` and 
`latestCheckpoint > checkpointTime`, we would not reset `latestCheckpoint`, so 
both checkpoints would use the same file name to write their checkpoints out.

Because of this, depending on which thread reaches the tempFile creation 
first, that would win - which is non-deterministic. The other thread would end 
up hitting an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...

2015-11-17 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9707#discussion_r45102167
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -187,16 +187,27 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
+  @volatile private var latestCheckpointTime: Time = null
+
   class CheckpointWriteHandler(
   checkpointTime: Time,
   bytes: Array[Byte],
   clearCheckpointDataLater: Boolean) extends Runnable {
 def run() {
+  if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
+latestCheckpointTime = checkpointTime
+  }
   var attempts = 0
   val startTime = System.currentTimeMillis()
   val tempFile = new Path(checkpointDir, "temp")
-  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
checkpointTime)
-  val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
checkpointTime)
+  // We will do checkpoint when generating a batch and completing a 
batch. When the processing
+  // time of a batch is greater than the batch interval, checkpointing 
for completing an old
+  // batch may run after checkpointing of a new batch. If this 
happens, checkpoint of an old
+  // batch actually has the latest information, so we want to recovery 
from it. Therefore, we
+  // also use the latest checkpoint time as the file name, so that we 
can recovery from the
+  // latest checkpoint file.
+  val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
latestCheckpointTime)
--- End diff --

Ok, then we are fine. Can you put in a comment where the executor is being 
created, so we don't end up causing a bug due to this class not being 
thread-safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11740][Streaming]Fix the race condition...

2015-11-17 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9707#issuecomment-157469418
  
LGTM. Thanks @zsxwing !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11731][STREAMING] Enable batching on Dr...

2015-11-13 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9695#issuecomment-156583976
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

2015-11-09 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9373#issuecomment-155257362
  
@brkyvz Sounds good, sir. I think the issue you saw seems to be a protobuf 
incompatibility issue - did you compile and run against the same hadoop-2 
version (2.2+ ?)
This patch now LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-06 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44185733
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
@@ -488,7 +491,12 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
   registerReceiver(streamId, typ, host, executorId, 
receiverEndpoint, context.senderAddress)
 context.reply(successful)
   case AddBlock(receivedBlockInfo) =>
-context.reply(addBlock(receivedBlockInfo))
+if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = 
true)) {
+  val f = 
Future(addBlock(receivedBlockInfo))(walBatchingThreadPool)
+  f.onComplete(result => 
context.reply(result.get))(walBatchingThreadPool)
--- End diff --

Actually would it even run? Since the threadpool is now dead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44057628
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
--- End diff --

`flatMap` does not, but the `asScala` and `asJava` calls are allocating new 
objects. They maybe lazily collecting the objects being iterated over, but they 
are still creating two new objects no? In a loop, the only new object being 
created is the list itself. The others are basically references to already 
created objects returned from the `deaggregate()` call.

It might be more code, but I never found loops hard to read - sometimes I 
find loops clearer. 

I don't feel that strongly about it - I just feel that a couple of scala to 
java (and vice versa) conversions just for a `flatMap()` call is not really 
worth it. Being Young-gen GC they are not that bad, I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not work

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44084903
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
+while (!walWrit

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44085335
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
+while (!walWrit

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44085133
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
+while (!walWrit

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44069967
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
--- End diff --

+1 for `wrappedLog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44070214
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
--- End diff --

Can

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r44069852
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 ---
@@ -157,9 +166,12 @@ private[streaming] class ReceivedBlockTracker(
 require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
 val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < 
cleanupThreshTime }.toSeq
 logInfo("Deleting batches " + timesToCleanup)
-writeToLog(BatchCleanupEvent(timesToCleanup))
-timeToAllocatedBlocks --= timesToCleanup
-writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, 
waitForCompletion))
+if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
+  timeToAllocatedBlocks --= timesToCleanup
+  writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, 
waitForCompletion))
--- End diff --

Ah, right. For whatever reason I got confused. Ignore my comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11457][Streaming][YARN] Fix incorrect A...

2015-11-05 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9412#issuecomment-154256737
  
Where is this being set into the SparkConf for a normal app? I am not sure 
of the parameters, but setting the new values looks good (not sure if 
`spark.ui.filters` and the other params) are the right ones. @vanzin  - if they 
are, this LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43963503
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
--- End diff --

This is really unnecessary. We should really not have to convert to a scala 
iterator, just to do a flatMap and then back to a Java one. Just having a loop 
adding all of the records map is not that horrible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43964017
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
+while (!walWrit

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43964535
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
--- End diff --

I really don't think we are dealing with all that much memory. I feel we 
are over-using the conversions here with very little additional benefit. All we 
are doing is a flatMap. 

Moreover, the flatMap is going to allocate a new collection/seq - so a loop 
is probably going to use less memory, because you are adding only the newly 
generated `ByteBuffers`. There are no new objects allocated other than the ones 
required (in the last comment when I said records, I meant the Buffers that 
were created as a result of the deaggregation, not all of the data from the 
`readAll`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastru

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43964698
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
+
+  private val batchedWriterThread = startBatchedWriterThread()
+
+  /**
+   * Write a byte buffer to the log file. This method adds the byteBuffer 
to a queue and blocks
+   * until the record is properly written by the parent.
+   */
+  override def write(byteBuffer: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = {
+val promise = Promise[WriteAheadLogRecordHandle]()
+walWriteQueue.offer(RecordBuffer(byteBuffer, time, promise))
+try {
+  Await.result(promise.future.recover { case _ => null 
}(ThreadUtils.sameThread),
+WAL_WRITE_STATUS_TIMEOUT.milliseconds)
+} catch {
+  case e: TimeoutException =>
+logWarning(s"Write to Write Ahead Log promise timed out after " +
+  s"$WAL_WRITE_STATUS_TIMEOUT millis for record.")
+null
+}
+  }
+
+  /**
+   * Read a segment from an existing Write Ahead Log. The data may be 
aggregated, and the user
+   * should de-aggregate using [[BatchedWriteAheadLog.deaggregate]]
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+parent.read(segment)
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def readAll(): JIterator[ByteBuffer] = {
+parent.readAll().asScala.flatMap(deaggregate).asJava
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * This method is handled by the parent WriteAheadLog.
+   */
+  override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = 
{
+parent.clean(threshTime, waitForCompletion)
+  }
+
+
+  /**
+   * Stop the batched writer thread, fulfill promises with failures and 
close parent writer.
+   */
+  override def close(): Unit = {
+logInfo("BatchedWriteAheadLog shutting down.")
+active = false
+batchedWriterThread.interrupt()
+batchedWriterThread.join()
+fulfillPromises()
+parent.close()
+  }
+
+  /**
+   * Respond to any promises that may have been left in the queue, to 
unblock receivers during
+   * shutdown.
+   */
+  private def fulfillPromises(): Unit = {
+while (!walWrit

[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43963794
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingQueue, TimeoutException}
+import java.util.{Iterator => JIterator}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Promise}
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+import org.apache.spark.util.{Utils, ThreadUtils}
+
+/**
+ * A wrapper for a WriteAheadLog that batches records before writing data. 
All other methods will
+ * be passed on to the wrapped class.
+ *
+ * Parent exposed for testing.
+ */
+private[streaming] class BatchedWriteAheadLog(private[util] val parent: 
WriteAheadLog)
+  extends WriteAheadLog with Logging {
+
+  import BatchedWriteAheadLog._
+
+  // exposed for tests
+  protected val walWriteQueue = new LinkedBlockingQueue[RecordBuffer]()
+
+  private val WAL_WRITE_STATUS_TIMEOUT = 5000 // 5 seconds
+
+  // Whether the writer thread is active
+  @volatile private var active: Boolean = true
+  protected val buffer = new ArrayBuffer[RecordBuffer]()
--- End diff --

It looks like there are a bunch of `asJava` calls on this buffer. Why not 
just use a Java List instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43961907
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 ---
@@ -157,9 +166,12 @@ private[streaming] class ReceivedBlockTracker(
 require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
 val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < 
cleanupThreshTime }.toSeq
 logInfo("Deleting batches " + timesToCleanup)
-writeToLog(BatchCleanupEvent(timesToCleanup))
-timeToAllocatedBlocks --= timesToCleanup
-writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, 
waitForCompletion))
+if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
+  timeToAllocatedBlocks --= timesToCleanup
+  writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, 
waitForCompletion))
--- End diff --

This is not actually new, but I think we should clean first and then remove 
the times from the `timeToAllocatedBlocks` map. Otherwise, failed cleanups can 
lead to the logs never getting cleaned up at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...

2015-11-04 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9143#discussion_r43962050
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
@@ -439,6 +439,9 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 private val submitJobThreadPool = ExecutionContext.fromExecutorService(
   ThreadUtils.newDaemonCachedThreadPool("submit-job-thead-pool"))
 
+private val walBatchingThreadPool = 
ExecutionContext.fromExecutorService(
+  ThreadUtils.newDaemonCachedThreadPool("wal-batching-thead-pool"))
--- End diff --

"thread-pool"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

2015-11-01 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9373#issuecomment-152893856
  
Did you try HDFS? I am assuming we'd get similar speed ups there too but in
that case there are far fewer files in which case the cost to setup the
streams are paid only a handful of times.

What I am wondering is if we'd actually ever have to deal with that many
files in the non-S3 case. This adds the additional cost for HDFS or any
other FS, no? In those cases the number of files usually would be pretty
small, which may result in this being more expensive.

If this adds only a small cost or if it becomes faster, then let's keep
this.


On Sunday, November 1, 2015, Burak Yavuz <notificati...@github.com
<javascript:_e(%7B%7D,'cvml','notificati...@github.com');>> wrote:

> @harishreedharan <https://github.com/harishreedharan> Here are some
> benchmark results:
> For reference, the driver was a r3.2xlarge EC2 instance.
>
> [image: image]
> 
<https://cloud.githubusercontent.com/assets/5243515/10871515/54c14846-809e-11e5-91e6-2ac3605d98b7.png>
> Num Threads Rate (ms / file) Speed-up 50 5.556101934 9.004997951 25
> 5.99898194 8.340196225 8 8.692144733 5.756080699 4 14.1162362 3.544336169
> 1 50.03268653 1
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/9373#issuecomment-152867985>.
>


-- 

Thanks,
Hari



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

2015-10-31 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9373#discussion_r43578480
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 ---
@@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog(
 val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
 logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
 
-logFilesToRead.iterator.map { file =>
+logFilesToRead.par.map { file =>
--- End diff --

This is an expensive operation - you'd end up running an O(n) operation to 
create a copy (in addition to the copy cost). Do we really need this? I am not 
entirely sure the copying adds a whole lot of value, considering that this 
array is not going to be very huge. Also note the additional cost to spin up 
threads (if the context does not already have them spun up). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11424] Guard against double-close() of ...

2015-10-30 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9382#issuecomment-152637344
  
+1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11324][STREAMING] Flag for closing Writ...

2015-10-26 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9285#issuecomment-151282234
  
If there is no other option, this LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11324] Flag for closing Write Ahead Log...

2015-10-26 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9285#issuecomment-151281257
  
Wouldn't this be really expensive? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11324][STREAMING] Flag for closing Writ...

2015-10-26 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9285#discussion_r43071619
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 ---
@@ -39,6 +39,7 @@ private[streaming] object WriteAheadLogUtils extends 
Logging {
 
   val DEFAULT_ROLLING_INTERVAL_SECS = 60
   val DEFAULT_MAX_FAILURES = 3
+  val WAL_CLOSE_AFTER_WRITE = 
"spark.streaming.writeAheadLog.closeAfterWrite"
--- End diff --

Wouldn't the config in that case be something like 
`spark.streaming.driver.writeAheadLog.closeFileAfterWrite` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9174#issuecomment-149442366
  
/cc @tdas @tgravescs @vanzin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...

2015-10-20 Thread harishreedharan
GitHub user harishreedharan opened a pull request:

https://github.com/apache/spark/pull/9174

[SPARK-11201] [YARN] Make sure SPARK_YARN_MODE is set before SparkHad…

…oopUtil.get is called.

If `StreamingContext.getOrCreate` is used in `yarn-client` mode, a 
`ClassCastException` gets thrown when `Client.scala` is initialized and 
`YarnSparkHadoopUtil.get` is called. This fails because the 
`SparkHadoopUtil.get` method is called in the `getOrCreate` method default 
argument, before SPARK_YARN_MODE is set.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/harishreedharan/spark streaming-yarn-client

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9174.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9174


commit 4a48bf02706e703fca6a4047f9b68ba67f816706
Author: Hari Shreedharan <hshreedha...@apache.org>
Date:   2015-10-20T05:58:11Z

[SPARK-11201] [YARN] Make sure SPARK_YARN_MODE is set before 
SparkHadoopUtil.get is called.

If `StreamingContext.getOrCreate` is used in `yarn-client` mode, a 
`ClassCastException` gets thrown when `Client.scala` is initialized and 
`YarnSparkHadoopUtil.get` is called. This fails because the 
`SparkHadoopUtil.get` method is called in the `getOrCreate` method default 
argument, before SPARK_YARN_MODE is set.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149442481
  
Agreed - synchronization is painful and we could end up missing events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9174#issuecomment-149651474
  
Looks like SPARK-10812 would actually take care of this - since the check 
happens in run time. 

+1 on backporting that to 1.5 branch. I will close this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149666851
  
So this is my theory (I don't have anything to back this up really). My 
assumption is based on the fact that if we don't set 
`hadoop.fs.hdfs.impl.disable.cache=true`, then the update of tokens seems to 
fail on the cached `FileSystem` instance (we needed to add that config in a PR 
at some point to ensure the update of tokens worked). So if that config needed 
to be set so the `FileSystem.get()` method to work, it likely means (again my 
theory), that a FileSystem object created using older tokens does not seem to 
know about the new ones. I can't be sure of this but that could explain why 
even updating the tokens locally using the `ExecutorDelegationTokenRenewer` 
does not fix the event log writes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...

2015-10-20 Thread harishreedharan
Github user harishreedharan closed the pull request at:

https://github.com/apache/spark/pull/9174


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11201] [YARN] Make sure SPARK_YARN_MODE...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9174#discussion_r42516999
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -1025,9 +1025,6 @@ object Client extends Logging {
 "future version of Spark. Use ./bin/spark-submit with \"--master 
yarn\"")
 }
 
-// Set an env variable indicating we are running in YARN mode.
-// Note that any env variable with the SPARK_ prefix gets propagated 
to all (remote) processes
-System.setProperty("SPARK_YARN_MODE", "true")
--- End diff --

Ok, to be safe I will revert the last commit where I removed the other
places where it was being set and add a comment there

On Tuesday, October 20, 2015, Imran Rashid <notificati...@github.com> wrote:

> In yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
> <https://github.com/apache/spark/pull/9174#discussion_r42515407>:
>
> > @@ -1025,9 +1025,6 @@ object Client extends Logging {
> >  "future version of Spark. Use ./bin/spark-submit with 
\"--master yarn\"")
> >  }
> >
> > -// Set an env variable indicating we are running in YARN mode.
> > -// Note that any env variable with the SPARK_ prefix gets 
propagated to all (remote) processes
> > -System.setProperty("SPARK_YARN_MODE", "true")
>
> to @tgravescs <https://github.com/tgravescs> point, would it hurt to
> leave this in here? and then change the comment to indicate that it should
> normally be set by SparkSubmit but left here in case anybody is still 
using
> this.
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/9174/files#r42515407>.
>


-- 

Thanks,
Hari



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-20 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149787894
  
If the current user's ugi is what is used by the FileSystem cache, this 
should not really be an issue no? Because we actually do update the current 
user's credentials. I am ok with doing something like this, but I'd rather know 
why before adding this. Why would new tokens not work? That seems like an HDFS 
issue no? Let me test this out with @SaintBacchus's other PR.

In client mode, that would really mean that tokens are used by executors 
and keytab used by AM and the driver. I am in half a mind to suggest not 
supporting long-running apps in client mode on secure HDFS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9168#discussion_r42421617
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala 
---
@@ -177,6 +177,7 @@ private[yarn] class AMDelegationTokenRenewer(
 })
 // Add the temp credentials back to the original ones.
 UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+SparkHadoopUtil.get.updateCurrentUserHDFSDelegationToken()
--- End diff --

So I don't understand this. We are already getting new tokens for what I 
assume is the primary namenode using the `obtainTokensForNamenodes` method 
call, correct? Is this only to get the tokens for the standby? 

 I don't think this will work though - You'd need to do the same thing we 
do for the primary (add the tokens to a new UGI and copy the credentials from 
there). From what I can remember, when you add new tokens for a service that 
already has tokens - the tokens won't get overwritten, even if they are expired 
- which is why we went with creating a new UGI and adding those credentials 
over (which would overwrite the old tokens). Are you sure the 
`HAUtil.cloneDelegationTokenForLogicalUri` will overwrite the tokens? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149344858
  
Hmm, I think the real issue is that the event logging does not doAs.

I think in `yarn-cluster`, since the SparkContext is created in the AM, the 
updated credentials actually are in the cache of the user writing to the event 
logs (since we are already running as that user and don't do a doAs).

In `yarn-client` though, because we don'd do a `doAs` - is it possible that 
the new tokens are not being used to write to the event log? 

@SaintBacchus Let me open a PR that does the doAs and combine it with your 
previous one #8867 and can you test it and see if it works? Or you can do it 
yourself - just add a `doAs` here: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L66
 , 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L143
 and 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L48
(basically anywhere HDFS is accessed)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9168#issuecomment-149338159
  
/cc @tgravescs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11182] HDFS Delegation Token will be ex...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9168#discussion_r42420896
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -130,6 +132,20 @@ class SparkHadoopUtil extends Logging {
 UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
   }
 
+  def updateCurrentUserHDFSDelegationToken(): Unit = {
+val conf = new Configuration()
+val nsId = DFSUtil.getNamenodeNameServiceId(conf)
+val isHaEnabled = HAUtil.isHAEnabled(conf, nsId)
+
+if(isHaEnabled){
+  val ugi = UserGroupInformation.getCurrentUser
+  val uri = FileSystem.getDefaultUri(conf)
+  val map = DFSUtil.getHaNnRpcAddresses(conf)
--- End diff --

@steveloughran If this is not a good option, is there another way of doing 
this? Maybe look up a specific configuration? Considering the fact that the 
configuration params are not likely to change (since it would break compat), we 
could just directly use those?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8867#issuecomment-149372740
  
OK, I think I know the issue - the reason is probably that the credentials 
are cached in the `FileSystem` instance using which the write happens. Since we 
are replacing the credentials but not the `FileSystem` instance itself this 
might not work, which is why #8942 works. We can do with either that approach 
or we can replace the `FileSystem` instance which would require a close and 
reopen of the file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149440116
  
Here it is:

OK, I think I know the issue - the reason is probably that the credentials 
are cached in the FileSystem instance using which the write happens. Since we 
are replacing the credentials but not the FileSystem instance itself this might 
not work, which is why #8942 works. We can do with either that approach or we 
can replace the FileSystem instance which would require a close and reopen of 
the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-10-19 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-149439989
  
Actually that is not right..I posted an explanation on your other PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11109] [CORE] Move FsHistoryProvider of...

2015-10-16 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9144#issuecomment-148717590
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11109] [CORE] Move FsHistoryProvider of...

2015-10-15 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9144#issuecomment-148563344
  
Does this class exist in older versions of Hadoop, like 1.1 etc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-12 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9043#discussion_r41785632
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  override def getConfig(): Map[String, String] = Map("Event log 
directory" -> logDir.toString)
+  override def getConfig(): Map[String, String] = {
+val safeMode = if (isFsInSafeMode()) {
+  Map("HDFS State" -> "In safe mode.")
--- End diff --

OK. In that case, could you make this message a bit more detailed. 
Something like HDFS is in safe mode, so history server will be unavailable or 
something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-12 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9043#issuecomment-147552208
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-09 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9043#discussion_r41669645
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   private val NOT_STARTED = ""
 
+  // Interval between safemode checks.
+  private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
+"spark.history.fs.safemodeCheck.interval", "5s")
--- End diff --

Do we use camel-case in other configs? If we do, then ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-09 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9043#discussion_r41670365
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
+  /**
+   * Checks whether HDFS is in safe mode. The API is slightly different 
between hadoop 1 and 2,
+   * so we have to resort to ugly reflection (as usual...).
+   *
+   * Note that DistributedFileSystem is a `@LimitedPrivate` class, which 
for all practical reasons
+   * makes it more public than not.
+   */
+  private[history] def isFsInSafeMode(): Boolean = {
+if (!fs.isInstanceOf[DistributedFileSystem]) {
+  return false
+}
+isFsInSafeMode(fs.asInstanceOf[DistributedFileSystem])
+  }
+
+  // For testing.
+  private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean 
= {
+val hadoop1Class = 
"org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
+val hadoop2Class = 
"org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
+val actionClass: Class[_] =
+  try {
+getClass().getClassLoader().loadClass(hadoop2Class)
+  } catch {
+case _: ClassNotFoundException =>
+  getClass().getClassLoader().loadClass(hadoop1Class)
+  }
+
+val action = actionClass.getField("SAFEMODE_GET").get(null)
+val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
+method.invoke(dfs, action).asInstanceOf[Boolean]
--- End diff --

I am assuming this piece of code is correct. I have no idea why a set 
method returns a `Boolean`, but who am I to argue with the Hadoop API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-09 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/9043#discussion_r41670523
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  override def getConfig(): Map[String, String] = Map("Event log 
directory" -> logDir.toString)
+  override def getConfig(): Map[String, String] = {
+val safeMode = if (isFsInSafeMode()) {
+  Map("HDFS State" -> "In safe mode.")
--- End diff --

This is only for testing I assume. Can you please add a comment stating 
that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11020] [core] Wait for HDFS to leave sa...

2015-10-09 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9043#issuecomment-146970575
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...

2015-10-08 Thread harishreedharan
GitHub user harishreedharan opened a pull request:

https://github.com/apache/spark/pull/9041

[SPARK-11019][streaming][flume] Gracefully shutdown Flume receiver th…

…reads.

Wait for a minute for the receiver threads to shutdown before interrupting 
them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/harishreedharan/spark flume-graceful-shutdown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9041.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9041


commit dba200fc912ae7b29b8082dd77490bcb504dcf74
Author: Hari Shreedharan <hshreedha...@apache.org>
Date:   2015-10-09T00:04:42Z

[SPARK-11019][streaming][flume] Gracefully shutdown Flume receiver threads.

Wait for a minute for the receiver threads to shutdown before interrupting 
them.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...

2015-10-08 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9041#issuecomment-146730016
  
This basically makes testing a bit more predictable. Sometimes, we end up 
hitting a situation where the last transaction is still not completed and the 
remaining data just stays stuck in Flume.

Also, it removes some unneeded `InterruptedException` from the logs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...

2015-10-08 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9041#issuecomment-146724990
  
/cc @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...

2015-10-08 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9041#issuecomment-146730211
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11019][streaming][flume] Gracefully shu...

2015-10-08 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9041#issuecomment-146730398
  
In most cases, less than 5-10 seconds - just for the last in-flight batch 
to be done. This is not going to slow our unit test runs down. I mostly hit it 
in our internal end-to-end tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10987] [yarn] Workaround for missing ne...

2015-10-07 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/9021#issuecomment-146420692
  
LGTM. 

This though worries me that there are other similar bugs lurking in the 
background. I would expect the new RPC to behave exactly like the old one 
(minus any akka-related issues we saw). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-145968060
  
Actually looking at this again, I think our only option is to log a 
message. It is possible that the `SparkContext` was already created and passed 
to us, in which case we can't do anything about dynamic allocation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
GitHub user harishreedharan opened a pull request:

https://github.com/apache/spark/pull/8998

[streaming] SPARK-10955. Disable dynamic allocation for Streaming app…

…lications.

Dynamic allocation can be painful for streaming apps and can lose data. The 
one drawback though
is that apps which run Streaming and non-streaming form the same context 
will also end up not
being able to do dynamic allocation.

Another option would be to log a warning.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/harishreedharan/spark ss-log-error

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8998


commit a4a521273114d3ac485592f5c4408adcc11bfc45
Author: Hari Shreedharan <hshreedha...@apache.org>
Date:   2015-10-06T18:58:39Z

[streaming] SPARK-10955. Disable dynamic allocation for Streaming 
applications.

Dynamic allocation can be painful for streaming apps and can lose data. The 
one drawback though
is that apps which run Streaming and non-streaming form the same context 
will also end up not
being able to do dynamic allocation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-145966395
  
/cc @vanzin @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-145996934
  
Added config parameter to enable it if the user really wants to enable 
dynamic allocation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-146012980
  
I don't have a strong preference for the config name. @vanzin, @andrewor14 
- like the current name or the one which @markgrover suggested? Vote please :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [streaming] SPARK-10955. Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-146017855
  
As I said, I don't mind either. But in this case, we need to be 
conservative. We can additional checks to see if WAL is enabled, but it is not 
possible to actually check what DStreams are being used (it could be a custom 
one, which does not break if executors go away), so that case still exists. 

How about we print the config param out or document it in that case? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10812] [yarn] Fix shutdown of token ren...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8996#issuecomment-145951831
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10955][streaming] Disable dynamic alloc...

2015-10-06 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8998#issuecomment-146033142
  
@tdas What do you think about the above? If you still think we should just 
make it a warn, I will make the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10916] Set perm gen size when launching...

2015-10-05 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8970#issuecomment-145689866
  
LGTM too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-09-29 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-144268884
  
Why would #8867 not be sufficient?It looks like that should be enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10473][YARN]Login again in the driver t...

2015-09-29 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8942#issuecomment-144273051
  
Hmm, this might be due to the cached token being missed? So it looks like 
the token got replaced alright, but it seems like the file could not be written 
with the new token?

@tgravescs might know more about this. I am not sure why this would cause 
an issue. It looks like the new token cannot be used to write an old file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...

2015-09-29 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/8867#discussion_r40708505
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -544,6 +545,7 @@ private[spark] class Client(
   logInfo(s"Credentials file set to: $credentialsFile")
   val renewalInterval = getTokenRenewalInterval(stagingDirPath)
   sparkConf.set("spark.yarn.token.renewal.interval", 
renewalInterval.toString)
+  SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(sparkConf)
--- End diff --

This depends on the lines immediately above this though. `setupCredentials` 
gets called way before this I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...

2015-09-29 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/8867#discussion_r40705866
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
 ---
@@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater(
 SparkHadoopUtil.get.getTimeFromNowToRenewal(
   sparkConf, 0.8, 
UserGroupInformation.getCurrentUser.getCredentials)
   if (timeFromNowToRenewal <= 0) {
-executorUpdaterRunnable.run()
+// If the `timeFromNowToRenewal` equals 0, it's better to wait 1 
minutes to schedule for
+// avoid cycle calling and cause StackOverflow Exception.
--- End diff --

We end up scheduling the next run of the `executorUpdaterRunnable` from 
within this class. If the executor renewal interval is set to 0, we end up 
calling it again from within the same thread (indefinitely). But I am not sure 
why it would be 0 here though (since this is started only if the login is from 
keytab). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

2015-09-28 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/2994#issuecomment-143944627
  
I will get back to this one in a week or so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...

2015-09-25 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8867#issuecomment-143322902
  
@SaintBacchus Right. That is what I was talking about above. I think your 
fix takes care of this issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10755][YARN]Set driver also update the ...

2015-09-24 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8867#issuecomment-143012421
  
@tgravescs So seems like this would fix that issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...

2015-09-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8881#issuecomment-142656310
  
Looking at it again, @srowen is right. Returning `None` makes 
`getOrCompute` think that no RDDs have been generated for a given time 
(artifact of the fact that `None` is returned when a map has no value for a 
key). So this really is case of returning an empty RDD from your function.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...

2015-09-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8881#issuecomment-142520996
  
LGTM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10772][Streaming][Scala]: NullPointerEx...

2015-09-23 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/8881#discussion_r40177949
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala 
---
@@ -210,6 +210,18 @@ class BasicOperationsSuite extends TestSuiteBase {
   input.map(_.map(_.toString))
 )
   }
+  
+  test("transform with NULL") {
+val input = Seq(1 to 4, Seq(), 5 to 8, 9 to 12)
+testOperation(
+  input,
+  (r: DStream[Int]) => r.transform(rdd => {if (rdd != null && 
!rdd.isEmpty()) rdd.map(_.toString) else null}),   // RDD.map in transform
+  input.filter(!_.isEmpty).map(_.map(_.toString)),
+  4, 
+  false
+)
+  }
--- End diff --

nit: The formatting here is pretty weird. Maybe take the transformation 
method out and separate it out as a `def`, so this method call can be in a 
single line:

```
def transformMethod(r: DStream[Int]): Option[RDD] = {
  r.transform { rdd => 
if (rdd != null && !rdd.isEmpty()) rdd.map(_.toString) else null
  }
}

testOperation(input, transformMethod, 
input.filter(!_.isEmpty).map(_.map(_.toString)), 4, false)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-10692][Streaming] Expose failureReasons...

2015-09-23 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/8892#issuecomment-142781672
  
Is this for a different branch? Why a new PR for the same thing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >