Re: Low Level Kafka Consumer for Spark
Thanks Jonathan, Yes, till non-ZK based offset management is available in Kafka, I need to maintain the offset in ZK. And yes, both cases explicit commit is necessary. I modified the Low Level Kafka Spark Consumer little bit to have Receiver spawns threads for every partition of the topic and perform the 'store' operation in multiple threads. It would be good if the receiver.store methods are made thread safe..which is not now presently . Waiting for TD's comment on this Kafka Spark Low Level consumer. Regards, Dibyendu On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote: Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samza http://samza.incubator.apache.org/ deals with the Kafka offset, the doc is here http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu
Re: -1s on pull requests?
Just came across this mail, thanks for initiating this discussion Kay. To add; another issue which recurs is very rapid commit's: before most contributors have had a chance to even look at the changes proposed. There is not much prior discussion on the jira or pr, and the time between submitting the PR and committing it is 12 hours. Particularly relevant when contributors are not on US timezones and/or colocated; I have raised this a few times before when the commit had other side effects not considered. On flip side we have PR's which have been languishing for weeks with little or no activity from committers side - making the contribution stale; so too long a delay is also definitely not the direction to take either ! Regards, Mridul On Tue, Jul 22, 2014 at 2:14 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Hi all, As the number of committers / contributors on Spark has increased, there are cases where pull requests get merged before all the review comments have been addressed. This happens say when one committer points out a problem with the pull request, and another committer doesn't see the earlier comment and merges the PR before the comment has been addressed. This is especially tricky for pull requests with a large number of comments, because it can be difficult to notice early comments describing blocking issues. This also happens when something accidentally gets merged after the tests have started but before tests have passed. Do folks have ideas on how we can handle this issue? Are there other projects that have good ways of handling this? It looks like for Hadoop, people can -1 / +1 on the JIRA. -Kay - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RE: Low Level Kafka Consumer for Spark
Hi, I think this is an awesome feature for Spark Streaming Kafka interface to offer user the controllability of partition offset, so user can have more applications based on this. What I concern is that if we want to do offset management, fault tolerant related control and others, we have to take the role as current ZookeeperConsumerConnect did, that would be a big field we should take care of, for example when node is failed, how to pass current partition to another consumer and some others. I’m not sure what is your thought? Thanks Jerry From: Dibyendu Bhattacharya [mailto:dibyendu.bhattach...@gmail.com] Sent: Tuesday, August 05, 2014 5:15 PM To: Jonathan Hodges; dev@spark.apache.org Cc: user Subject: Re: Low Level Kafka Consumer for Spark Thanks Jonathan, Yes, till non-ZK based offset management is available in Kafka, I need to maintain the offset in ZK. And yes, both cases explicit commit is necessary. I modified the Low Level Kafka Spark Consumer little bit to have Receiver spawns threads for every partition of the topic and perform the 'store' operation in multiple threads. It would be good if the receiver.store methods are made thread safe..which is not now presently . Waiting for TD's comment on this Kafka Spark Low Level consumer. Regards, Dibyendu On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.commailto:hodg...@gmail.com wrote: Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.commailto:yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samzahttp://samza.incubator.apache.org/ deals with the Kafka offset, the doc is herehttp://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.commailto:yanfang...@gmail.com +1 (206) 849-4108tel:%2B1%20%28206%29%20849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.commailto:dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu
any interest in something like rdd.parent[T](n) (equivalent to firstParent[T] for n==0) ?
Not that rdd.dependencies(n).rdd.asInstanceOf[RDD[T]] is terrible, but rdd.parent[T](n) better captures the intent. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: -1s on pull requests?
1. Include the commit hash in the tests have started/completed FYI: Looks like Xiangrui's already got a JIRA issue for this. SPARK-2622: Add Jenkins build numbers to SparkQA messages https://issues.apache.org/jira/browse/SPARK-2622 2. Pin a message to the start or end of the PR Should new JIRA issues for this item fall under the following umbrella issue? SPARK-2230: Improvements to Jenkins QA Harness https://issues.apache.org/jira/browse/SPARK-2230 Nick
Spark maven project with the latest Spark jars
Hi, I'm trying to create a maven project that references the latest build of Spark. 1)downloaded sources and compiled the latest version of Spark. 2)added new spark-core jar to the a new local maven repo 3)created Scala maven project with net.alchim31.maven (scala-archetype-simple v 1.5) 4)added dependency to the new spark-core inside the pom.xml 5)I create SparkContext in the code of this project: val sc = new SparkContext(local, test) 6)When I run it, I get the error: Error:scalac: bad symbolic reference. A signature in RDD.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling RDD.class. This problem doesn't occur if I reference the spark-core from the maven repo. What am I doing wrong? Best regards, Alexander
Re: Using mllib-1.1.0-SNAPSHOT on Spark 1.0.1
I created the assembly file but still it wants to pick the mllib from the cluster: jar tf ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar | grep QuadraticMinimizer org/apache/spark/mllib/optimization/QuadraticMinimizer$$anon$1.class /Users/v606014/dist-1.0.1/bin/spark-submit --master spark://TUSCA09LMLVT00C.local:7077 --class ALSDriver ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar inputPath outputPath Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.mllib.recommendation.ALS.setLambdaL1(D)Lorg/apache/spark/mllib/recommendation/ALS; Now if I force it to use the jar that I gave using spark.files.userClassPathFirst, then it fails on some serialization issues... A simple solution is to cherry pick the files I need from spark branch to the application branch but I am not sure that's the right thing to do... The way userClassPathFirst is behaving, there might be bugs in it... Any suggestions will be appreciated Thanks. Deb On Sat, Aug 2, 2014 at 11:12 AM, Xiangrui Meng men...@gmail.com wrote: Yes, that should work. spark-mllib-1.1.0 should be compatible with spark-core-1.0.1. On Sat, Aug 2, 2014 at 10:54 AM, Debasish Das debasish.da...@gmail.com wrote: Let me try it... Will this be fixed if I generate a assembly file with mllib-1.1.0 SNAPSHOT jar and other dependencies with the rest of the application code ? On Sat, Aug 2, 2014 at 10:46 AM, Xiangrui Meng men...@gmail.com wrote: You can try enabling spark.files.userClassPathFirst. But I'm not sure whether it could solve your problem. -Xiangrui On Sat, Aug 2, 2014 at 10:13 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I have deployed spark stable 1.0.1 on the cluster but I have new code that I added in mllib-1.1.0-SNAPSHOT. I am trying to access the new code using spark-submit as follows: spark-job --class com.verizon.bda.mllib.recommendation.ALSDriver --executor-memory 16g --total-executor-cores 16 --jars spark-mllib_2.10-1.1.0-SNAPSHOT.jar,scopt_2.10-3.2.0.jar sag-core-0.0.1-SNAPSHOT.jar --rank 25 --numIterations 10 --lambda 1.0 --qpProblem 2 inputPath outputPath I can see the jars are getting added to httpServer as expected: 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/spark-mllib_2.10-1.1.0-SNAPSHOT.jar at http://10.145.84.20:37798/jars/spark-mllib_2.10-1.1.0-SNAPSHOT.jar with timestamp 1406998204236 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/scopt_2.10-3.2.0.jar at http://10.145.84.20:37798/jars/scopt_2.10-3.2.0.jar with timestamp 1406998204237 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/sag-core-0.0.1-SNAPSHOT.jar at http://10.145.84.20:37798/jars/sag-core-0.0.1-SNAPSHOT.jar with timestamp 1406998204238 But the job still can't access code form mllib-1.1.0 SNAPSHOT.jar...I think it's picking up the mllib from cluster which is at 1.0.1... Please help. I will ask for a PR tomorrow but internally we want to generate results from the new code. Thanks. Deb
Re: -1s on pull requests?
I think the build number is included in the SparkQA message, for example: https://github.com/apache/spark/pull/1788 The build number 17941 is in the URL https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17941/consoleFull;. Just need to be careful to match the number. Another solution is to kill running Jenkins jobs if there is a code change. On Tue, Aug 5, 2014 at 8:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: 1. Include the commit hash in the tests have started/completed FYI: Looks like Xiangrui's already got a JIRA issue for this. SPARK-2622: Add Jenkins build numbers to SparkQA messages https://issues.apache.org/jira/browse/SPARK-2622 2. Pin a message to the start or end of the PR Should new JIRA issues for this item fall under the following umbrella issue? SPARK-2230: Improvements to Jenkins QA Harness https://issues.apache.org/jira/browse/SPARK-2230 Nick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Using mllib-1.1.0-SNAPSHOT on Spark 1.0.1
If you cannot change the Spark jar deployed on the cluster, an easy solution would be renaming ALS in your jar. If userClassPathFirst doesn't work, could you create a JIRA and attach the log? Thanks! -Xiangrui On Tue, Aug 5, 2014 at 9:10 AM, Debasish Das debasish.da...@gmail.com wrote: I created the assembly file but still it wants to pick the mllib from the cluster: jar tf ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar | grep QuadraticMinimizer org/apache/spark/mllib/optimization/QuadraticMinimizer$$anon$1.class /Users/v606014/dist-1.0.1/bin/spark-submit --master spark://TUSCA09LMLVT00C.local:7077 --class ALSDriver ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar inputPath outputPath Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.mllib.recommendation.ALS.setLambdaL1(D)Lorg/apache/spark/mllib/recommendation/ALS; Now if I force it to use the jar that I gave using spark.files.userClassPathFirst, then it fails on some serialization issues... A simple solution is to cherry pick the files I need from spark branch to the application branch but I am not sure that's the right thing to do... The way userClassPathFirst is behaving, there might be bugs in it... Any suggestions will be appreciated Thanks. Deb On Sat, Aug 2, 2014 at 11:12 AM, Xiangrui Meng men...@gmail.com wrote: Yes, that should work. spark-mllib-1.1.0 should be compatible with spark-core-1.0.1. On Sat, Aug 2, 2014 at 10:54 AM, Debasish Das debasish.da...@gmail.com wrote: Let me try it... Will this be fixed if I generate a assembly file with mllib-1.1.0 SNAPSHOT jar and other dependencies with the rest of the application code ? On Sat, Aug 2, 2014 at 10:46 AM, Xiangrui Meng men...@gmail.com wrote: You can try enabling spark.files.userClassPathFirst. But I'm not sure whether it could solve your problem. -Xiangrui On Sat, Aug 2, 2014 at 10:13 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I have deployed spark stable 1.0.1 on the cluster but I have new code that I added in mllib-1.1.0-SNAPSHOT. I am trying to access the new code using spark-submit as follows: spark-job --class com.verizon.bda.mllib.recommendation.ALSDriver --executor-memory 16g --total-executor-cores 16 --jars spark-mllib_2.10-1.1.0-SNAPSHOT.jar,scopt_2.10-3.2.0.jar sag-core-0.0.1-SNAPSHOT.jar --rank 25 --numIterations 10 --lambda 1.0 --qpProblem 2 inputPath outputPath I can see the jars are getting added to httpServer as expected: 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/spark-mllib_2.10-1.1.0-SNAPSHOT.jar at http://10.145.84.20:37798/jars/spark-mllib_2.10-1.1.0-SNAPSHOT.jar with timestamp 1406998204236 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/scopt_2.10-3.2.0.jar at http://10.145.84.20:37798/jars/scopt_2.10-3.2.0.jar with timestamp 1406998204237 14/08/02 12:50:04 INFO SparkContext: Added JAR file:/vzhome/v606014/spark-glm/sag-core-0.0.1-SNAPSHOT.jar at http://10.145.84.20:37798/jars/sag-core-0.0.1-SNAPSHOT.jar with timestamp 1406998204238 But the job still can't access code form mllib-1.1.0 SNAPSHOT.jar...I think it's picking up the mllib from cluster which is at 1.0.1... Please help. I will ask for a PR tomorrow but internally we want to generate results from the new code. Thanks. Deb - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Tiny curiosity question on closing the jdbc connection
Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
Thanks. Those are definitely great problems to fix! On Tue, Aug 5, 2014 at 1:11 PM, Stephen Boesch java...@gmail.com wrote: Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
The stmt.isClosed just looks like stupidity on my part, no secret motivation :) Thanks for noticing it. As for the leaking in the case of malformed statements, isn't that addressed by context.addOnCompleteCallback{ () = closeIfNeeded() } or am I misunderstanding? On Tue, Aug 5, 2014 at 3:15 PM, Reynold Xin r...@databricks.com wrote: Thanks. Those are definitely great problems to fix! On Tue, Aug 5, 2014 at 1:11 PM, Stephen Boesch java...@gmail.com wrote: Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
Hi yes that callback takes care of it. thanks! 2014-08-05 13:58 GMT-07:00 Cody Koeninger c...@koeninger.org: The stmt.isClosed just looks like stupidity on my part, no secret motivation :) Thanks for noticing it. As for the leaking in the case of malformed statements, isn't that addressed by context.addOnCompleteCallback{ () = closeIfNeeded() } or am I misunderstanding? On Tue, Aug 5, 2014 at 3:15 PM, Reynold Xin r...@databricks.com wrote: Thanks. Those are definitely great problems to fix! On Tue, Aug 5, 2014 at 1:11 PM, Stephen Boesch java...@gmail.com wrote: Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
Yes it is. I actually commented on it: https://github.com/apache/spark/pull/1792/files#r15840899 On Tue, Aug 5, 2014 at 1:58 PM, Cody Koeninger c...@koeninger.org wrote: The stmt.isClosed just looks like stupidity on my part, no secret motivation :) Thanks for noticing it. As for the leaking in the case of malformed statements, isn't that addressed by context.addOnCompleteCallback{ () = closeIfNeeded() } or am I misunderstanding? On Tue, Aug 5, 2014 at 3:15 PM, Reynold Xin r...@databricks.com wrote: Thanks. Those are definitely great problems to fix! On Tue, Aug 5, 2014 at 1:11 PM, Stephen Boesch java...@gmail.com wrote: Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Re: Tiny curiosity question on closing the jdbc connection
The existing callback does take care of it: within the DAGScheduler there is a finally block to ensure the callbacks are executed. try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) } finally { taskContext.executeOnCompleteCallbacks() } So I have removed that exception handling code from the PR and updated the JIRA. 2014-08-05 14:01 GMT-07:00 Reynold Xin r...@databricks.com: Yes it is. I actually commented on it: https://github.com/apache/spark/pull/1792/files#r15840899 On Tue, Aug 5, 2014 at 1:58 PM, Cody Koeninger c...@koeninger.org wrote: The stmt.isClosed just looks like stupidity on my part, no secret motivation :) Thanks for noticing it. As for the leaking in the case of malformed statements, isn't that addressed by context.addOnCompleteCallback{ () = closeIfNeeded() } or am I misunderstanding? On Tue, Aug 5, 2014 at 3:15 PM, Reynold Xin r...@databricks.com wrote: Thanks. Those are definitely great problems to fix! On Tue, Aug 5, 2014 at 1:11 PM, Stephen Boesch java...@gmail.com wrote: Thanks Reynold, Ted Yu did mention offline and I put in a jira already. Another small concern: there appears to be no exception handling from the creation of the prepared statement (line 74) through to the executeQuery (line 86). In case of error/exception it would seem to be leaking connections (/statements). If that were the case then I would include a small patch for the exception trapping in that section of code as well. BTW I was looking at this code for another reason, not intending to be a bother ;) 2014-08-05 13:03 GMT-07:00 Reynold Xin r...@databricks.com: I'm pretty sure it is an oversight. Would you like to submit a pull request to fix that? On Tue, Aug 5, 2014 at 12:14 PM, Stephen Boesch java...@gmail.com wrote: Within its compute.close method, the JdbcRDD class has this interesting logic for closing jdbc connection: try { if (null != conn ! stmt.isClosed()) conn.close() logInfo(closed connection) } catch { case e: Exception = logWarning(Exception closing connection, e) } Notice that the second check is on stmt having been closed - not on the connection. I would wager this were not a simple oversight and there were some motivation for this logic- curious if anyone would be able to shed some light? My particular interest is that I have written custom ORM's in jdbc since late 90's and never did it this way.
Hello All
Im new to Spark community. Actively working on Hadoop eco system ( more specifically YARN). I'm very keen on getting my hands dirtily with Spark. Please let me know any pointers to start with. Thanks in advance Best regards Guru Yeleswarapu
Re: Hello All
Hi Guru, Take a look at: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark It has all the information you need on how to contribute to Spark. Also take a look at: https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel where a list of issues exist that need fixing. You can also request or propose new additions to Spark. Happy coding! Burak - Original Message - From: Gurumurthy Yeleswarapu guru...@yahoo.com.INVALID To: dev@spark.apache.org Sent: Tuesday, August 5, 2014 2:43:04 PM Subject: Hello All Im new to Spark community. Actively working on Hadoop eco system ( more specifically YARN). I'm very keen on getting my hands dirtily with Spark. Please let me know any pointers to start with. Thanks in advance Best regards Guru Yeleswarapu - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Unit test best practice for Spark-derived projects
Hello, I 've been switching Mahout from Spark 0.9 to Spark 1.0.x [1] and noticed that tests now run much slower compared to 0.9 with CPU running idle most of the time. I had to conclude that most of that time is spent on tearing down/resetting Spark context which apparently now takes significantly longer time in local mode than before. Q1 --- Is there a way to mitigate long session startup times with local context? Q2 -- Our unit tests are basically mixing in a rip-off of LocalSparkContext, and we are using local[3]. Looking into 1.0.x code, i noticed that a lot of Spark unit test code has switched to SharedSparkContext (i.e. no context reset between individual tests). Is that now recommended practice to write Spark-based unit tests? Q3 -- Any other reasons that i may have missed for degraded test performance? [1] https://github.com/apache/mahout/pull/40 thank you in advance. -Dmitriy