proposal / discuss: multiple Serializers within a SparkContext?
Hey all, Was messing around with Spark and Google FlatBuffers for fun, and it got me thinking about Spark and serialization. I know there's been work / talk about in-memory columnar formats Spark SQL, so maybe there are ways to provide this flexibility already that I've missed? Either way, my thoughts: Java and Kryo serialization are really nice in that they require almost no extra work on the part of the user. They can also represent complex object graphs with cycles etc. There are situations where other serialization frameworks are more efficient: * A Hadoop Writable style format that delineates key-value boundaries and allows for raw comparisons can greatly speed up some shuffle operations by entirely avoiding deserialization until the object hits user code. Writables also probably ser / deser faster than Kryo. * No-deserialization formats like FlatBuffers and Cap'n Proto address the tradeoff between (1) Java objects that offer fast access but take lots of space and stress GC and (2) Kryo-serialized buffers that are more compact but take time to deserialize. The drawbacks of these frameworks are that they require more work from the user to define types. And that they're more restrictive in the reference graphs they can represent. In large applications, there are probably a few points where a specialized serialization format is useful. But requiring Writables everywhere because they're needed in a particularly intense shuffle is cumbersome. In light of that, would it make sense to enable varying Serializers within an app? It could make sense to choose a serialization framework both based on the objects being serialized and what they're being serialized for (caching vs. shuffle). It might be possible to implement this underneath the Serializer interface with some sort of multiplexing serializer that chooses between subserializers. Nothing urgent here, but curious to hear other's opinions. -Sandy
Re: proposal / discuss: multiple Serializers within a SparkContext?
Technically you can already do custom serializer for each shuffle operation (it is part of the ShuffledRDD). I've seen Matei suggesting on jira issues (or github) in the past a storage policy in which you can specify how data should be stored. I think that would be a great API to have in the long run. Designing it won't be trivial though. On Fri, Nov 7, 2014 at 1:05 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hey all, Was messing around with Spark and Google FlatBuffers for fun, and it got me thinking about Spark and serialization. I know there's been work / talk about in-memory columnar formats Spark SQL, so maybe there are ways to provide this flexibility already that I've missed? Either way, my thoughts: Java and Kryo serialization are really nice in that they require almost no extra work on the part of the user. They can also represent complex object graphs with cycles etc. There are situations where other serialization frameworks are more efficient: * A Hadoop Writable style format that delineates key-value boundaries and allows for raw comparisons can greatly speed up some shuffle operations by entirely avoiding deserialization until the object hits user code. Writables also probably ser / deser faster than Kryo. * No-deserialization formats like FlatBuffers and Cap'n Proto address the tradeoff between (1) Java objects that offer fast access but take lots of space and stress GC and (2) Kryo-serialized buffers that are more compact but take time to deserialize. The drawbacks of these frameworks are that they require more work from the user to define types. And that they're more restrictive in the reference graphs they can represent. In large applications, there are probably a few points where a specialized serialization format is useful. But requiring Writables everywhere because they're needed in a particularly intense shuffle is cumbersome. In light of that, would it make sense to enable varying Serializers within an app? It could make sense to choose a serialization framework both based on the objects being serialized and what they're being serialized for (caching vs. shuffle). It might be possible to implement this underneath the Serializer interface with some sort of multiplexing serializer that chooses between subserializers. Nothing urgent here, but curious to hear other's opinions. -Sandy
How spark/*/Storage/BlockManagerMaster.askDriverWithReply() responds to various query messages
Hi, I am trying to understand how the /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works. def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) if (result.length != numPeers) { throw new SparkException( Error getting peers, only got + result.size + instead of + numPeers) } result } Here, getPeers calls askDriverWithReply(). private def askDriverWithReply[T](message: Any): T = { // TODO: Consider removing multiple attempts if (driverActor == null) { throw new SparkException(Error sending message to BlockManager as driverActor is null + [message = + message + ]) } var attempts = 0 var lastException: Exception = null while (attempts AKKA_RETRY_ATTEMPTS) { attempts += 1 try { val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException(BlockManagerMaster returned null) } return result.asInstanceOf[T] } catch { case ie: InterruptedException = throw ie case e: Exception = lastException = e logWarning(Error sending message to BlockManagerMaster in + attempts + attempts, e) } Thread.sleep(AKKA_RETRY_INTERVAL_MS) } throw new SparkException(Error sending message to BlockManagerMaster [message = + message + ], lastException) } Here, getPeers method calls askDriverWithReply() with message GetPeers(). The Driver returns the BlockManagerId's. val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) Here, we obtain result. But, I couldn't find definition of ask() that processes message GetPeers(). Can someone please tell me how/where the 'result' is being constructed?? Thank you!! Karthik
Re: How spark/*/Storage/BlockManagerMaster.askDriverWithReply() responds to various query messages
ask() is a method on every Actor. It comes from the akka library, which spark uses for a lot of the communication between various components. There is some documentation on ask() here (go to the section on Send messages): http://doc.akka.io/docs/akka/2.2.3/scala/actors.html though if you are totally new to it, you might want to work through a simple akka tutorial first, before diving into the docs. On Fri, Nov 7, 2014 at 4:11 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I am trying to understand how the /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works. def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) if (result.length != numPeers) { throw new SparkException( Error getting peers, only got + result.size + instead of + numPeers) } result } Here, getPeers calls askDriverWithReply(). private def askDriverWithReply[T](message: Any): T = { // TODO: Consider removing multiple attempts if (driverActor == null) { throw new SparkException(Error sending message to BlockManager as driverActor is null + [message = + message + ]) } var attempts = 0 var lastException: Exception = null while (attempts AKKA_RETRY_ATTEMPTS) { attempts += 1 try { val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException(BlockManagerMaster returned null) } return result.asInstanceOf[T] } catch { case ie: InterruptedException = throw ie case e: Exception = lastException = e logWarning(Error sending message to BlockManagerMaster in + attempts + attempts, e) } Thread.sleep(AKKA_RETRY_INTERVAL_MS) } throw new SparkException(Error sending message to BlockManagerMaster [message = + message + ], lastException) } Here, getPeers method calls askDriverWithReply() with message GetPeers(). The Driver returns the BlockManagerId's. val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) Here, we obtain result. But, I couldn't find definition of ask() that processes message GetPeers(). Can someone please tell me how/where the 'result' is being constructed?? Thank you!! Karthik
Re: Appropriate way to add a debug flag
(Whoops, forgot to copy dev@ in my original reply; adding it back) Yeah, the GraphViz part was mostly for fun and for understanding cyclic object graphs. In general, an object graph might contain cycles, so for understanding the overall structure it's handy to have a picture. The GraphViz thing is actually pretty fun to play with in an interactive notebook environment, since even fairly simple programs can produce really interesting object graphs. For the purposes of debugging serialization errors, though, I guess you only need to know about some path of non-transient fields that leads from the target object to an unserializable object. For that case, you might be able to add a try-catch block that performs an object graph traversal to find a path to a non-serializable object if a serialization error occurs. Would logging this path be sufficient to debug the most common serialization issues, such as unexpected over-capture of non-serializable objects in closures? I guess that some users might also want a more general object graph printer / debugger to help debug performance issues related to over-captures that do not lead to errors, but that might be a lower priority / could happen in a separate PR. Another option would be to do something like http://blog.crazybob.org/2007/02/debugging-serialization.html to print a trace from the serializer's point of view, but the output from that might be hard to understand since it could obscure the chain of references / fields that led to the error. - Josh On Thu, Nov 6, 2014 at 12:01 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Josh – I think this could be useful for visualizing references in RDDs but I actually wasn’t sure that this was that the original issue wanted in terms of a solution. I assumed the the more useful output would be a string output. E.g. RDD - Child 1 — Child 1.1 — Child 1.2 - Child 2 - Child 3 So that it’s readily integrated with the Spark logs. Would you agree? I like the SparkConf idea, I will look into that. From: Josh Rosen rosenvi...@gmail.com Date: Thursday, November 6, 2014 at 2:42 PM To: Ganelin, Ilya ilya.gane...@capitalone.com Subject: Re: Appropriate way to add a debug flag This is timely, since I’ve actually been hacking on some related stuff in order to debug whether unexpected objects are being pulled into closures. Here’s some code to print a graphviz DOT file that shows the graph of non-transient, non-primitive objects reachable from a given object: https://gist.github.com/JoshRosen/d6a8972c2e97d040 For enabling / disabling automatic logging of this, I suppose that you could add a configuration option to SparkConf. On November 5, 2014 at 8:02:35 AM, Ganelin, Ilya ( ilya.gane...@capitalone.com) wrote: Hello all – I am working on https://issues.apache.org/jira/browse/SPARK-3694 and would like to understand the appropriate mechanism by which to check for a debug flag before printing a graph traversal of dependencies of an RDD or Task. I understand that I can use the logging utility and use logDebug to actually print the output but the graph traversal should not be executed unless the debug output is enabled. The code changes I will be making will be in the DAGScheduler and TaskSetManager classes. Modifying the function interfaces does not seem like the appropriate approach . Is there an existing debug flag that is set somehow within the spark config? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: [VOTE] Designating maintainers for some Spark components
+1 (binding) I see this as a way to increase transparency and efficiency around a process that already informally exists, with benefits to both new contributors and committers. For new contributors, it makes clear who they should ping about a pending patch. For committers, it's a good reference for who to rope in if they're reviewing a change that touches code they're unfamiliar with. I've often found myself in that situation when doing a review; for me, having this list would be quite helpful. -Kay On Thu, Nov 6, 2014 at 10:00 AM, Josh Rosen rosenvi...@gmail.com wrote: +1 (binding). (our pull request browsing tool is open-source, by the way; contributions welcome: https://github.com/databricks/spark-pr-dashboard) On Thu, Nov 6, 2014 at 9:28 AM, Nick Pentreath nick.pentre...@gmail.com wrote: +1 (binding) — Sent from Mailbox On Thu, Nov 6, 2014 at 6:52 PM, Debasish Das debasish.da...@gmail.com wrote: +1 The app to track PRs based on component is a great idea... On Thu, Nov 6, 2014 at 8:47 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: +1 Sean On Nov 5, 2014, at 6:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi all, I wanted to share a discussion we've been having on the PMC list, as well as call for an official vote on it on a public list. Basically, as the Spark project scales up, we need to define a model to make sure there is still great oversight of key components (in particular internal architecture and public APIs), and to this end I've proposed implementing a maintainer model for some of these components, similar to other large projects. As background on this, Spark has grown a lot since joining Apache. We've had over 80 contributors/month for the past 3 months, which I believe makes us the most active project in contributors/month at Apache, as well as over 500 patches/month. The codebase has also grown significantly, with new libraries for SQL, ML, graphs and more. In this kind of large project, one common way to scale development is to assign maintainers to oversee key components, where each patch to that component needs to get sign-off from at least one of its maintainers. Most existing large projects do this -- at Apache, some large ones with this model are CloudStack (the second-most active project overall), Subversion, and Kafka, and other examples include Linux and Python. This is also by-and-large how Spark operates today -- most components have a de-facto maintainer. IMO, adopting this model would have two benefits: 1) Consistent oversight of design for that component, especially regarding architecture and API. This process would ensure that the component's maintainers see all proposed changes and consider them to fit together in a good way. 2) More structure for new contributors and committers -- in particular, it would be easy to look up who’s responsible for each module and ask them for reviews, etc, rather than having patches slip between the cracks. We'd like to start with in a light-weight manner, where the model only applies to certain key components (e.g. scheduler, shuffle) and user-facing APIs (MLlib, GraphX, etc). Over time, as the project grows, we can expand it if we deem it useful. The specific mechanics would be as follows: - Some components in Spark will have maintainers assigned to them, where one of the maintainers needs to sign off on each patch to the component. - Each component with maintainers will have at least 2 maintainers. - Maintainers will be assigned from the most active and knowledgeable committers on that component by the PMC. The PMC can vote to add / remove maintainers, and maintained components, through consensus. - Maintainers are expected to be active in responding to patches for their components, though they do not need to be the main reviewers for them (e.g. they might just sign off on architecture / API). To prevent inactive maintainers from blocking the project, if a maintainer isn't responding in a reasonable time period (say 2 weeks), other committers can merge the patch, and the PMC will want to discuss adding another maintainer. If you'd like to see examples for this model, check out the following projects: - CloudStack: https://cwiki.apache.org/confluence/display/CLOUDSTACK/CloudStack+Maintainers+Guide https://cwiki.apache.org/confluence/display/CLOUDSTACK/CloudStack+Maintainers+Guide - Subversion: https://subversion.apache.org/docs/community-guide/roles.html https://subversion.apache.org/docs/community-guide/roles.html Finally, I wanted to list our current proposal for initial components and maintainers. It would be good to get feedback on other components we might add, but please
Re: Implementing TinkerPop on top of GraphX
Who here would be interested in helping to work on an implementation of the Tikerpop3 Gremlin API for Spark? Is this something that should continue in the Spark discussion group, or should it migrate to the Gremlin message group? Reynold is right that there will be inherent mismatches in the APIs, and there will need to be some discussions with the GraphX group about the best way to go. One example would be edge ids. GraphX has vertex ids, but no explicit edges ids, while Gremlin has both. Edge ids could be put into the attr field, but then that means the user would have to explicitly subclass their edge attribute to the edge attribute interface. Is that worth doing, versus adding an id to everyones's edges? Kyle On Thu, Nov 6, 2014 at 7:24 PM, Reynold Xin r...@databricks.com wrote: Some form of graph querying support would be great to have. This can be a great community project hosted outside of Spark initially, both due to the maturity of the component itself as well as the maturity of query language standards (there isn't really a dominant standard for graph ql). One thing is that GraphX API will need to evolve and probably need to provide more primitives in order to support the new ql implementation. There might also be inherent mismatches in the way the external API is defined vs what GraphX can support. We should discuss those on a case-by-case basis. On Thu, Nov 6, 2014 at 5:42 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I think its best to look to existing standard rather then try to make your own. Of course small additions would need to be added to make it valuable for the Spark community, like a method similar to Gremlin's 'table' function, that produces an RDD instead. But there may be a lot of extra code and data structures that would need to be added to make it work, and those may not be directly applicable to all GraphX users. I think it would be best run as a separate module/project that builds directly on top of GraphX. Kyle On Thu, Nov 6, 2014 at 4:39 PM, York, Brennon brennon.y...@capitalone.com wrote: My personal 2c is that, since GraphX is just beginning to provide a full featured graph API, I think it would be better to align with the TinkerPop group rather than roll our own. In my mind the benefits out way the detriments as follows: Benefits: * GraphX gains the ability to become another core tenant within the TinkerPop community allowing a more diverse group of users into the Spark ecosystem. * TinkerPop can continue to maintain and own a solid / feature-rich graph API that has already been accepted by a wide audience, relieving the pressure of “one off” API additions from the GraphX team. * GraphX can demonstrate its ability to be a key player in the GraphDB space sitting inline with other major distributions (Neo4j, Titan, etc.). * Allows for the abstract graph traversal logic (query API) to be owned and maintained by a group already proven on the topic. Drawbacks: * GraphX doesn’t own the API for its graph query capability. This could be seen as good or bad, but it might make GraphX-specific implementation additions more tricky (possibly). Also, GraphX will need to maintain the features described within the TinkerPop API as that might change in the future. From: Kushal Datta kushal.da...@gmail.com Date: Thursday, November 6, 2014 at 4:00 PM To: York, Brennon brennon.y...@capitalone.com Cc: Kyle Ellrott kellr...@soe.ucsc.edu, Reynold Xin r...@databricks.com, dev@spark.apache.org dev@spark.apache.org, Matthias Broecheler matth...@thinkaurelius.com Subject: Re: Implementing TinkerPop on top of GraphX Before we dive into the implementation details, what are the high level thoughts on Gremlin/GraphX? Scala already provides the procedural way to query graphs in GraphX today. So, today I can run g.vertices().filter().join() queries as OLAP in GraphX just like Tinkerpop3 Gremlin, of course sans the useful operators that Gremlin offers such as outE, inE, loop, as, dedup, etc. In that case is mapping Gremlin operators to GraphX api's a better approach or should we extend the existing set of transformations/actions that GraphX already offers with the useful operators from Gremlin? For example, we add as(), loop() and dedup() methods in VertexRDD and EdgeRDD. Either way we get a desperately needed graph query interface in GraphX. On Thu, Nov 6, 2014 at 3:25 PM, York, Brennon brennon.y...@capitalone.com wrote: This was my thought exactly with the TinkerPop3 release. Looks like, to move this forward, we’d need to implement gremlin-core per http://www.tinkerpop.com/docs/3.0.0.M1/#_implementing_gremlin_core. The real question lies in whether GraphX can only support the OLTP functionality, or if we can bake into it the OLAP requirements as well. At a first glance I believe we could create an entire OLAP system. If so, I believe we could do this in a set of parallel subtasks, those being the
Re: Implementing TinkerPop on top of GraphX
I’m definitely onboard to help / take a portion of this work. I too am wondering what the proper discussion venue should be moving forward given Reynold’s remarks on a community project hosted outside Spark. If I’m understanding correctly my take would be: 1. to find a core group of developers to take on this work (Kyle, myself, ???) 2. build an initial implementation 3. iterate / discuss with the Spark community as we find discrepancies between GraphX and the Gremlin3 API’s 4. contribute back to the Spark community when complete Does that seem like a sound plan or am I way off base here? Itching to work on this :) From: Kyle Ellrott kellr...@soe.ucsc.edumailto:kellr...@soe.ucsc.edu Date: Friday, November 7, 2014 at 10:59 AM To: Reynold Xin r...@databricks.commailto:r...@databricks.com Cc: York, Brennon brennon.y...@capitalone.commailto:brennon.y...@capitalone.com, Kushal Datta kushal.da...@gmail.commailto:kushal.da...@gmail.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org, Matthias Broecheler matth...@thinkaurelius.commailto:matth...@thinkaurelius.com Subject: Re: Implementing TinkerPop on top of GraphX Who here would be interested in helping to work on an implementation of the Tikerpop3 Gremlin API for Spark? Is this something that should continue in the Spark discussion group, or should it migrate to the Gremlin message group? Reynold is right that there will be inherent mismatches in the APIs, and there will need to be some discussions with the GraphX group about the best way to go. One example would be edge ids. GraphX has vertex ids, but no explicit edges ids, while Gremlin has both. Edge ids could be put into the attr field, but then that means the user would have to explicitly subclass their edge attribute to the edge attribute interface. Is that worth doing, versus adding an id to everyones's edges? Kyle On Thu, Nov 6, 2014 at 7:24 PM, Reynold Xin r...@databricks.commailto:r...@databricks.com wrote: Some form of graph querying support would be great to have. This can be a great community project hosted outside of Spark initially, both due to the maturity of the component itself as well as the maturity of query language standards (there isn't really a dominant standard for graph ql). One thing is that GraphX API will need to evolve and probably need to provide more primitives in order to support the new ql implementation. There might also be inherent mismatches in the way the external API is defined vs what GraphX can support. We should discuss those on a case-by-case basis. On Thu, Nov 6, 2014 at 5:42 PM, Kyle Ellrott kellr...@soe.ucsc.edumailto:kellr...@soe.ucsc.edu wrote: I think its best to look to existing standard rather then try to make your own. Of course small additions would need to be added to make it valuable for the Spark community, like a method similar to Gremlin's 'table' function, that produces an RDD instead. But there may be a lot of extra code and data structures that would need to be added to make it work, and those may not be directly applicable to all GraphX users. I think it would be best run as a separate module/project that builds directly on top of GraphX. Kyle On Thu, Nov 6, 2014 at 4:39 PM, York, Brennon brennon.y...@capitalone.commailto:brennon.y...@capitalone.com wrote: My personal 2c is that, since GraphX is just beginning to provide a full featured graph API, I think it would be better to align with the TinkerPop group rather than roll our own. In my mind the benefits out way the detriments as follows: Benefits: * GraphX gains the ability to become another core tenant within the TinkerPop community allowing a more diverse group of users into the Spark ecosystem. * TinkerPop can continue to maintain and own a solid / feature-rich graph API that has already been accepted by a wide audience, relieving the pressure of “one off” API additions from the GraphX team. * GraphX can demonstrate its ability to be a key player in the GraphDB space sitting inline with other major distributions (Neo4j, Titan, etc.). * Allows for the abstract graph traversal logic (query API) to be owned and maintained by a group already proven on the topic. Drawbacks: * GraphX doesn’t own the API for its graph query capability. This could be seen as good or bad, but it might make GraphX-specific implementation additions more tricky (possibly). Also, GraphX will need to maintain the features described within the TinkerPop API as that might change in the future. From: Kushal Datta kushal.da...@gmail.commailto:kushal.da...@gmail.com Date: Thursday, November 6, 2014 at 4:00 PM To: York, Brennon brennon.y...@capitalone.commailto:brennon.y...@capitalone.com Cc: Kyle Ellrott kellr...@soe.ucsc.edumailto:kellr...@soe.ucsc.edu, Reynold Xin r...@databricks.commailto:r...@databricks.com, dev@spark.apache.orgmailto:dev@spark.apache.org dev@spark.apache.orgmailto:dev@spark.apache.org, Matthias
Re: Implementing TinkerPop on top of GraphX
I think if we are going to use GraphX as the query engine in Tinkerpop3, then the Tinkerpop3 community is the right platform to further the discussion. The reason I asked the question on improving APIs in GraphX is because why only Gremlin, any graph DSL can exploit the GraphX APIs. Cypher has some good subgraph matching query interfaces which I believe can be distributed using GraphX apis. An edge ID is an internal attribute of the edge generated automatically, mostly hidden from the user. That's why adding it as an edge property might not be a good idea. There are several little differences like this. E.g. in Tinkerpop3 Gremlin implementation for Giraph, only vertex programs are executed in Giraph directly. The side-effect operators are mapped to Map-Reduce functions. In the implementation we are talking about, all of these operations can be done within GraphX. I will be interested to co-develop the query engine. @Reynold, I agree. And as I said earlier, the apis should be designed in such a way that it can be used in any Graph DSL. On Fri, Nov 7, 2014 at 10:59 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: Who here would be interested in helping to work on an implementation of the Tikerpop3 Gremlin API for Spark? Is this something that should continue in the Spark discussion group, or should it migrate to the Gremlin message group? Reynold is right that there will be inherent mismatches in the APIs, and there will need to be some discussions with the GraphX group about the best way to go. One example would be edge ids. GraphX has vertex ids, but no explicit edges ids, while Gremlin has both. Edge ids could be put into the attr field, but then that means the user would have to explicitly subclass their edge attribute to the edge attribute interface. Is that worth doing, versus adding an id to everyones's edges? Kyle On Thu, Nov 6, 2014 at 7:24 PM, Reynold Xin r...@databricks.com wrote: Some form of graph querying support would be great to have. This can be a great community project hosted outside of Spark initially, both due to the maturity of the component itself as well as the maturity of query language standards (there isn't really a dominant standard for graph ql). One thing is that GraphX API will need to evolve and probably need to provide more primitives in order to support the new ql implementation. There might also be inherent mismatches in the way the external API is defined vs what GraphX can support. We should discuss those on a case-by-case basis. On Thu, Nov 6, 2014 at 5:42 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I think its best to look to existing standard rather then try to make your own. Of course small additions would need to be added to make it valuable for the Spark community, like a method similar to Gremlin's 'table' function, that produces an RDD instead. But there may be a lot of extra code and data structures that would need to be added to make it work, and those may not be directly applicable to all GraphX users. I think it would be best run as a separate module/project that builds directly on top of GraphX. Kyle On Thu, Nov 6, 2014 at 4:39 PM, York, Brennon brennon.y...@capitalone.com wrote: My personal 2c is that, since GraphX is just beginning to provide a full featured graph API, I think it would be better to align with the TinkerPop group rather than roll our own. In my mind the benefits out way the detriments as follows: Benefits: * GraphX gains the ability to become another core tenant within the TinkerPop community allowing a more diverse group of users into the Spark ecosystem. * TinkerPop can continue to maintain and own a solid / feature-rich graph API that has already been accepted by a wide audience, relieving the pressure of “one off” API additions from the GraphX team. * GraphX can demonstrate its ability to be a key player in the GraphDB space sitting inline with other major distributions (Neo4j, Titan, etc.). * Allows for the abstract graph traversal logic (query API) to be owned and maintained by a group already proven on the topic. Drawbacks: * GraphX doesn’t own the API for its graph query capability. This could be seen as good or bad, but it might make GraphX-specific implementation additions more tricky (possibly). Also, GraphX will need to maintain the features described within the TinkerPop API as that might change in the future. From: Kushal Datta kushal.da...@gmail.com Date: Thursday, November 6, 2014 at 4:00 PM To: York, Brennon brennon.y...@capitalone.com Cc: Kyle Ellrott kellr...@soe.ucsc.edu, Reynold Xin r...@databricks.com, dev@spark.apache.org dev@spark.apache.org, Matthias Broecheler matth...@thinkaurelius.com Subject: Re: Implementing TinkerPop on top of GraphX Before we dive into the implementation details, what are the high level thoughts on Gremlin/GraphX? Scala already provides the procedural way to query graphs in GraphX today. So, today I can
Bind exception while running FlumeEventCount
Hi, I have installed spark-1.1.0 and apache flume 1.4 for running streaming example FlumeEventCount. Previously the code was working fine. Now Iam facing with the below mentioned issues. My flume is running properly it is able to write the file. The command I use is bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 172.29.17.178 65001 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Error starting receiver 0: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 INFO flume.FlumeReceiver: Flume receiver stopped 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/11/07 23:19:23 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopping BlockGenerator 14/11/07 23:19:23 INFO util.RecurringTimer: Stopped timer for BlockGenerator after time 1415382563200 14/11/07 23:19:23 INFO receiver.BlockGenerator: Waiting for block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Pushing out the last 0 blocks 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped block pushing thread 14/11/07 23:19:23 INFO receiver.BlockGenerator: Stopped BlockGenerator 14/11/07 23:19:23 INFO receiver.ReceiverSupervisorImpl: Waiting for executor stop is over 14/11/07 23:19:23 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 14/11/07 23:19:23 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.jboss.netty.channel.ChannelException: Failed to bind to: /172.29.17.178:65001 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at
Replacing Spark's native scheduler with Sparrow
I just watched Kay's talk from 2013 on Sparrow https://www.youtube.com/watch?v=ayjH_bG-RC0. Is replacing Spark's native scheduler with Sparrow still on the books? The Sparrow repo https://github.com/radlab/sparrow hasn't been updated recently, and I don't see any JIRA issues about it. It would be good to at least have a JIRA issue to track progress on this if it's a long-term goal. Nick
Re: [VOTE] Designating maintainers for some Spark components
-1 (not binding, +1 for maintainer, -1 for sign off) Agree with Greg and Vinod. In the beginning, everything is better (more efficient, more focus), but after some time, fighting begins. Code style is the most hot topic to fight (we already saw it in some PRs). If two committers (one of them is maintainer) have not got a agreement on code style, before this process, they will ask comments from other committers, but after this process, the maintainer have higher priority to -1, then maintainer will keep his/her personal preference, it's hard to make a agreement. Finally, different components will have different code style (or others). Right now, maintainers are kind of first contact or best contacts, the best person to review the PR in that component. We could announce it, then new contributors can easily find the right one to review. My 2 cents. Davies On Thu, Nov 6, 2014 at 11:43 PM, Vinod Kumar Vavilapalli vino...@apache.org wrote: With the maintainer model, the process is as follows: - Any committer could review the patch and merge it, but they would need to forward it to me (or another core API maintainer) to make sure we also approve - At any point during this process, I could come in and -1 it, or give feedback - In addition, any other committer beyond me is still allowed to -1 this patch The only change in this model is that committers are responsible to forward patches in these areas to certain other committers. If every committer had perfect oversight of the project, they could have also seen every patch to their component on their own, but this list ensures that they see it even if they somehow overlooked it. Having done the job of playing an informal 'maintainer' of a project myself, this is what I think you really need: The so called 'maintainers' do one of the below - Actively poll the lists and watch over contributions. And follow what is repeated often around here: Trust but verify. - Setup automated mechanisms to send all bug-tracker updates of a specific component to a list that people can subscribe to And/or - Individual contributors send review requests to unofficial 'maintainers' over dev-lists or through tools. Like many projects do with review boards and other tools. Note that none of the above is a required step. It must not be, that's the point. But once set as a convention, they will all help you address your concerns with project scalability. Anything else that you add is bestowing privileges to a select few and forming dictatorships. And contrary to what the proposal claims, this is neither scalable nor confirming to Apache governance rules. +Vinod - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Replacing Spark's native scheduler with Sparrow
Hi Nick, This hasn't yet been directly supported by Spark because of a lack of demand. The last time I ran a throughput test on the default Spark scheduler (~1 year ago, so this may have changed), it could launch approximately 1500 tasks / second. If, for example, you have a cluster of 100 machines, this means the scheduler can launch 150 tasks per machine per second. I don't know of any existing Spark clusters that have a large enough number of machines or short enough tasks to justify the added complexity of distributing the scheduler. Eventually I hope to see Spark used on much larger clusters, such that Sparrow will be necessary! -Kay On Fri, Nov 7, 2014 at 3:05 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I just watched Kay's talk from 2013 on Sparrow https://www.youtube.com/watch?v=ayjH_bG-RC0. Is replacing Spark's native scheduler with Sparrow still on the books? The Sparrow repo https://github.com/radlab/sparrow hasn't been updated recently, and I don't see any JIRA issues about it. It would be good to at least have a JIRA issue to track progress on this if it's a long-term goal. Nick
Re: [VOTE] Designating maintainers for some Spark components
+1 (binding) I agree with the proposal that it just formalizes what we have been doing till now, and will increase the efficiency and focus of the review process. To address Davies' concern, I agree coding style is often a hot topic of contention. But that is just an indication that our processes are not perfect and we have much room to improve (which is what this proposal is all about). Regarding the specific case of coding style, we should all get together, discuss, and make our coding style guide more comprehensive so that such concerns can be dealt with once and not be a recurring concern. And that guide will override any one's personal preference, be it the maintainer or a new committer. TD On Fri, Nov 7, 2014 at 3:18 PM, Davies Liu dav...@databricks.com wrote: -1 (not binding, +1 for maintainer, -1 for sign off) Agree with Greg and Vinod. In the beginning, everything is better (more efficient, more focus), but after some time, fighting begins. Code style is the most hot topic to fight (we already saw it in some PRs). If two committers (one of them is maintainer) have not got a agreement on code style, before this process, they will ask comments from other committers, but after this process, the maintainer have higher priority to -1, then maintainer will keep his/her personal preference, it's hard to make a agreement. Finally, different components will have different code style (or others). Right now, maintainers are kind of first contact or best contacts, the best person to review the PR in that component. We could announce it, then new contributors can easily find the right one to review. My 2 cents. Davies On Thu, Nov 6, 2014 at 11:43 PM, Vinod Kumar Vavilapalli vino...@apache.org wrote: With the maintainer model, the process is as follows: - Any committer could review the patch and merge it, but they would need to forward it to me (or another core API maintainer) to make sure we also approve - At any point during this process, I could come in and -1 it, or give feedback - In addition, any other committer beyond me is still allowed to -1 this patch The only change in this model is that committers are responsible to forward patches in these areas to certain other committers. If every committer had perfect oversight of the project, they could have also seen every patch to their component on their own, but this list ensures that they see it even if they somehow overlooked it. Having done the job of playing an informal 'maintainer' of a project myself, this is what I think you really need: The so called 'maintainers' do one of the below - Actively poll the lists and watch over contributions. And follow what is repeated often around here: Trust but verify. - Setup automated mechanisms to send all bug-tracker updates of a specific component to a list that people can subscribe to And/or - Individual contributors send review requests to unofficial 'maintainers' over dev-lists or through tools. Like many projects do with review boards and other tools. Note that none of the above is a required step. It must not be, that's the point. But once set as a convention, they will all help you address your concerns with project scalability. Anything else that you add is bestowing privileges to a select few and forming dictatorships. And contrary to what the proposal claims, this is neither scalable nor confirming to Apache governance rules. +Vinod - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [VOTE] Designating maintainers for some Spark components
Sorry for my last email, I misunderstood the proposal here, all the committer still have equal -1 to all the code changes. Also, as mentioned in the proposal, the sign off only happens to public API and architect, something like discussion about code style things are still the same. So, I'd revert my vote to +1. Sorry for this. Davies On Fri, Nov 7, 2014 at 3:18 PM, Davies Liu dav...@databricks.com wrote: -1 (not binding, +1 for maintainer, -1 for sign off) Agree with Greg and Vinod. In the beginning, everything is better (more efficient, more focus), but after some time, fighting begins. Code style is the most hot topic to fight (we already saw it in some PRs). If two committers (one of them is maintainer) have not got a agreement on code style, before this process, they will ask comments from other committers, but after this process, the maintainer have higher priority to -1, then maintainer will keep his/her personal preference, it's hard to make a agreement. Finally, different components will have different code style (or others). Right now, maintainers are kind of first contact or best contacts, the best person to review the PR in that component. We could announce it, then new contributors can easily find the right one to review. My 2 cents. Davies On Thu, Nov 6, 2014 at 11:43 PM, Vinod Kumar Vavilapalli vino...@apache.org wrote: With the maintainer model, the process is as follows: - Any committer could review the patch and merge it, but they would need to forward it to me (or another core API maintainer) to make sure we also approve - At any point during this process, I could come in and -1 it, or give feedback - In addition, any other committer beyond me is still allowed to -1 this patch The only change in this model is that committers are responsible to forward patches in these areas to certain other committers. If every committer had perfect oversight of the project, they could have also seen every patch to their component on their own, but this list ensures that they see it even if they somehow overlooked it. Having done the job of playing an informal 'maintainer' of a project myself, this is what I think you really need: The so called 'maintainers' do one of the below - Actively poll the lists and watch over contributions. And follow what is repeated often around here: Trust but verify. - Setup automated mechanisms to send all bug-tracker updates of a specific component to a list that people can subscribe to And/or - Individual contributors send review requests to unofficial 'maintainers' over dev-lists or through tools. Like many projects do with review boards and other tools. Note that none of the above is a required step. It must not be, that's the point. But once set as a convention, they will all help you address your concerns with project scalability. Anything else that you add is bestowing privileges to a select few and forming dictatorships. And contrary to what the proposal claims, this is neither scalable nor confirming to Apache governance rules. +Vinod - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Replacing Spark's native scheduler with Sparrow
If, for example, you have a cluster of 100 machines, this means the scheduler can launch 150 tasks per machine per second. Did you mean 15 tasks per machine per second here? Or alternatively, 10 machines? I don't know of any existing Spark clusters that have a large enough number of machines or short enough tasks to justify the added complexity of distributing the scheduler. Actually, this was the reason I took interest in Sparrow--specifically, the idea of a Spark cluster handling many very short ( 50 ms) tasks. At the recent Spark Committer Night http://www.meetup.com/Spark-NYC/events/209271842/ in NYC, I asked Michael if he thought that Spark SQL could eventually completely fill the need for very low latency queries currently served by MPP databases like Redshift or Vertica. If I recall correctly, he said that the main obstacle to that was simply task startup time, which is on the order of 100 ms. Is there interest in (or perhaps an existing initiative related to) improving task startup times to the point where one could legitimately look at Spark SQL as a low latency database that can serve many users or applications at once? That would probably make a good use case for Sparrow, no? Nick
Re: Replacing Spark's native scheduler with Sparrow
On Fri, Nov 7, 2014 at 6:20 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: If, for example, you have a cluster of 100 machines, this means the scheduler can launch 150 tasks per machine per second. Did you mean 15 tasks per machine per second here? Or alternatively, 10 machines? Yes -- sorry for the terrible math there! I don't know of any existing Spark clusters that have a large enough number of machines or short enough tasks to justify the added complexity of distributing the scheduler. Actually, this was the reason I took interest in Sparrow--specifically, the idea of a Spark cluster handling many very short ( 50 ms) tasks. At the recent Spark Committer Night http://www.meetup.com/Spark-NYC/events/209271842/ in NYC, I asked Michael if he thought that Spark SQL could eventually completely fill the need for very low latency queries currently served by MPP databases like Redshift or Vertica. If I recall correctly, he said that the main obstacle to that was simply task startup time, which is on the order of 100 ms. Is there interest in (or perhaps an existing initiative related to) improving task startup times to the point where one could legitimately look at Spark SQL as a low latency database that can serve many users or applications at once? That would probably make a good use case for Sparrow, no? Shorter tasks would indeed be a good use case for Sparrow, and was the motivation behind the Sparrow work. When evaluating Sparrow, we focused on running SQL workloads where tasks were in the 50-100ms range (detailed in the paper http://people.csail.mit.edu/matei/papers/2013/sosp_sparrow.pdf). I know Evan, who I added here, has been looking at task startup times in the context of ML workloads; this motivated some recent work (e.g., https://issues.apache.org/jira/browse/SPARK-3984) to improve metrics shown in the UI to describe task launch overhead. For jobs we've looked at, task startup time was at most tens of milliseconds (I also remember this being the case when we ran short tasks on Sparrow). Decreasing this seems like it would be widely beneficial, especially if there are cases where it's more like 100ms, as Michael alluded. Hopefully some of the improved UI reporting will help to understand the degree to which this is (or is not) an issue. I'm not sure how much Evan is attempting to quantify the overhead versus fix it -- so I'll let him chime in here. Nick
Re: Replacing Spark's native scheduler with Sparrow
Sounds good. I'm looking forward to tracking improvements in this area. Also, just to connect some more dots here, I just remembered that there is currently an initiative to add an IndexedRDD https://issues.apache.org/jira/browse/SPARK-2365 interface. Some interesting use cases mentioned there include (emphasis added): To address these problems, we propose IndexedRDD, an efficient key-value store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and *point lookups, updates, and deletions*. GraphX would be the first user of IndexedRDD, since it currently implements a limited form of this functionality in VertexRDD. We envision a variety of other uses for IndexedRDD, including *streaming updates* to RDDs, *direct serving* from RDDs, and as an execution strategy for Spark SQL. Maybe some day we'll have Spark clusters directly serving up point lookups or updates. I imagine the tasks running on clusters like that would be tiny and would benefit from very low task startup times and scheduling latency. Am I painting that picture correctly? Anyway, thanks for explaining the current status of Sparrow. Nick
Re: Replacing Spark's native scheduler with Sparrow
On Fri, Nov 7, 2014 at 8:04 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sounds good. I'm looking forward to tracking improvements in this area. Also, just to connect some more dots here, I just remembered that there is currently an initiative to add an IndexedRDD https://issues.apache.org/jira/browse/SPARK-2365 interface. Some interesting use cases mentioned there include (emphasis added): To address these problems, we propose IndexedRDD, an efficient key-value store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and *point lookups, updates, and deletions*. GraphX would be the first user of IndexedRDD, since it currently implements a limited form of this functionality in VertexRDD. We envision a variety of other uses for IndexedRDD, including *streaming updates* to RDDs, *direct serving* from RDDs, and as an execution strategy for Spark SQL. Maybe some day we'll have Spark clusters directly serving up point lookups or updates. I imagine the tasks running on clusters like that would be tiny and would benefit from very low task startup times and scheduling latency. Am I painting that picture correctly? Yeah - we painted a similar picture in a short paper last year titled The Case for Tiny Tasks in Compute Clusters http://shivaram.org/publications/tinytasks-hotos13.pdf Anyway, thanks for explaining the current status of Sparrow. Nick
Re: Replacing Spark's native scheduler with Sparrow
Hmm, relevant quote from section 3.3: newer frameworks like Spark [35] reduce the overhead to 5ms. To support tasks that complete in hundreds of mil- liseconds, we argue for reducing task launch overhead even further to 1ms so that launch overhead constitutes at most 1% of task runtime. By maintaining an active thread pool for task execution on each worker node and caching binaries, task launch overhead can be reduced to the time to make a remote procedure call to the slave machine to launch the task. Today’s datacenter networks easily allow a RPC to complete within 1ms. In fact, re- cent work showed that 10μs RPCs are possible in the short term [26]; thus, with careful engineering, we be- lieve task launch overheads of 50μ s are attainable. 50μ s task launch overheads would enable even smaller tasks that could read data from in-memory or from flash stor- age in order to complete in milliseconds. So it looks like I misunderstood the current cost of task initialization. It's already as low as 5ms (and not 100ms)? Nick On Fri, Nov 7, 2014 at 11:15 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: On Fri, Nov 7, 2014 at 8:04 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sounds good. I'm looking forward to tracking improvements in this area. Also, just to connect some more dots here, I just remembered that there is currently an initiative to add an IndexedRDD https://issues.apache.org/jira/browse/SPARK-2365 interface. Some interesting use cases mentioned there include (emphasis added): To address these problems, we propose IndexedRDD, an efficient key-value store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and *point lookups, updates, and deletions*. GraphX would be the first user of IndexedRDD, since it currently implements a limited form of this functionality in VertexRDD. We envision a variety of other uses for IndexedRDD, including *streaming updates* to RDDs, *direct serving* from RDDs, and as an execution strategy for Spark SQL. Maybe some day we'll have Spark clusters directly serving up point lookups or updates. I imagine the tasks running on clusters like that would be tiny and would benefit from very low task startup times and scheduling latency. Am I painting that picture correctly? Yeah - we painted a similar picture in a short paper last year titled The Case for Tiny Tasks in Compute Clusters http://shivaram.org/publications/tinytasks-hotos13.pdf Anyway, thanks for explaining the current status of Sparrow. Nick
Re: Replacing Spark's native scheduler with Sparrow
I think Kay might be able to give a better answer. The most recent benchmark I remember had the number at at somewhere between 8.6ms and 14.6ms depending on the Spark version ( https://github.com/apache/spark/pull/2030#issuecomment-52715181). Another point to note is that this is the total time to run a null job, so this includes scheduling + task launch + time to send back results etc. Shivaram On Fri, Nov 7, 2014 at 9:23 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, relevant quote from section 3.3: newer frameworks like Spark [35] reduce the overhead to 5ms. To support tasks that complete in hundreds of mil- liseconds, we argue for reducing task launch overhead even further to 1ms so that launch overhead constitutes at most 1% of task runtime. By maintaining an active thread pool for task execution on each worker node and caching binaries, task launch overhead can be reduced to the time to make a remote procedure call to the slave machine to launch the task. Today’s datacenter networks easily allow a RPC to complete within 1ms. In fact, re- cent work showed that 10μs RPCs are possible in the short term [26]; thus, with careful engineering, we be- lieve task launch overheads of 50μ s are attainable. 50μ s task launch overheads would enable even smaller tasks that could read data from in-memory or from flash stor- age in order to complete in milliseconds. So it looks like I misunderstood the current cost of task initialization. It's already as low as 5ms (and not 100ms)? Nick On Fri, Nov 7, 2014 at 11:15 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: On Fri, Nov 7, 2014 at 8:04 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sounds good. I'm looking forward to tracking improvements in this area. Also, just to connect some more dots here, I just remembered that there is currently an initiative to add an IndexedRDD https://issues.apache.org/jira/browse/SPARK-2365 interface. Some interesting use cases mentioned there include (emphasis added): To address these problems, we propose IndexedRDD, an efficient key-value store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and *point lookups, updates, and deletions*. GraphX would be the first user of IndexedRDD, since it currently implements a limited form of this functionality in VertexRDD. We envision a variety of other uses for IndexedRDD, including *streaming updates* to RDDs, *direct serving* from RDDs, and as an execution strategy for Spark SQL. Maybe some day we'll have Spark clusters directly serving up point lookups or updates. I imagine the tasks running on clusters like that would be tiny and would benefit from very low task startup times and scheduling latency. Am I painting that picture correctly? Yeah - we painted a similar picture in a short paper last year titled The Case for Tiny Tasks in Compute Clusters http://shivaram.org/publications/tinytasks-hotos13.pdf Anyway, thanks for explaining the current status of Sparrow. Nick
Re: [VOTE] Designating maintainers for some Spark components
+1 (binding) On 8 Nov 2014 07:26, Davies Liu dav...@databricks.com wrote: Sorry for my last email, I misunderstood the proposal here, all the committer still have equal -1 to all the code changes. Also, as mentioned in the proposal, the sign off only happens to public API and architect, something like discussion about code style things are still the same. So, I'd revert my vote to +1. Sorry for this. Davies On Fri, Nov 7, 2014 at 3:18 PM, Davies Liu dav...@databricks.com wrote: -1 (not binding, +1 for maintainer, -1 for sign off) Agree with Greg and Vinod. In the beginning, everything is better (more efficient, more focus), but after some time, fighting begins. Code style is the most hot topic to fight (we already saw it in some PRs). If two committers (one of them is maintainer) have not got a agreement on code style, before this process, they will ask comments from other committers, but after this process, the maintainer have higher priority to -1, then maintainer will keep his/her personal preference, it's hard to make a agreement. Finally, different components will have different code style (or others). Right now, maintainers are kind of first contact or best contacts, the best person to review the PR in that component. We could announce it, then new contributors can easily find the right one to review. My 2 cents. Davies On Thu, Nov 6, 2014 at 11:43 PM, Vinod Kumar Vavilapalli vino...@apache.org wrote: With the maintainer model, the process is as follows: - Any committer could review the patch and merge it, but they would need to forward it to me (or another core API maintainer) to make sure we also approve - At any point during this process, I could come in and -1 it, or give feedback - In addition, any other committer beyond me is still allowed to -1 this patch The only change in this model is that committers are responsible to forward patches in these areas to certain other committers. If every committer had perfect oversight of the project, they could have also seen every patch to their component on their own, but this list ensures that they see it even if they somehow overlooked it. Having done the job of playing an informal 'maintainer' of a project myself, this is what I think you really need: The so called 'maintainers' do one of the below - Actively poll the lists and watch over contributions. And follow what is repeated often around here: Trust but verify. - Setup automated mechanisms to send all bug-tracker updates of a specific component to a list that people can subscribe to And/or - Individual contributors send review requests to unofficial 'maintainers' over dev-lists or through tools. Like many projects do with review boards and other tools. Note that none of the above is a required step. It must not be, that's the point. But once set as a convention, they will all help you address your concerns with project scalability. Anything else that you add is bestowing privileges to a select few and forming dictatorships. And contrary to what the proposal claims, this is neither scalable nor confirming to Apache governance rules. +Vinod - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Replacing Spark's native scheduler with Sparrow
I don't have much more info than what Shivaram said. My sense is that, over time, task launch overhead with Spark has slowly grown as Spark supports more and more functionality. However, I haven't seen it be as high as the 100ms Michael quoted (maybe this was for jobs with tasks that have much larger objects that take a long time to deserialize?). Fortunately, the UI now quantifies this: if you click Show Additional Metrics, the scheduler delay (which basically represents the overhead of shipping the task to the worker and getting the result back), the task deserialization time, and the result serialization time all represent parts of the task launch overhead. So, you can use the UI to get a sense of what this overhead is for the workload you're considering and whether it's worth optimizing. -Kay On Fri, Nov 7, 2014 at 9:43 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: I think Kay might be able to give a better answer. The most recent benchmark I remember had the number at at somewhere between 8.6ms and 14.6ms depending on the Spark version ( https://github.com/apache/spark/pull/2030#issuecomment-52715181). Another point to note is that this is the total time to run a null job, so this includes scheduling + task launch + time to send back results etc. Shivaram On Fri, Nov 7, 2014 at 9:23 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, relevant quote from section 3.3: newer frameworks like Spark [35] reduce the overhead to 5ms. To support tasks that complete in hundreds of mil- liseconds, we argue for reducing task launch overhead even further to 1ms so that launch overhead constitutes at most 1% of task runtime. By maintaining an active thread pool for task execution on each worker node and caching binaries, task launch overhead can be reduced to the time to make a remote procedure call to the slave machine to launch the task. Today’s datacenter networks easily allow a RPC to complete within 1ms. In fact, re- cent work showed that 10μs RPCs are possible in the short term [26]; thus, with careful engineering, we be- lieve task launch overheads of 50μ s are attainable. 50μ s task launch overheads would enable even smaller tasks that could read data from in-memory or from flash stor- age in order to complete in milliseconds. So it looks like I misunderstood the current cost of task initialization. It's already as low as 5ms (and not 100ms)? Nick On Fri, Nov 7, 2014 at 11:15 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: On Fri, Nov 7, 2014 at 8:04 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sounds good. I'm looking forward to tracking improvements in this area. Also, just to connect some more dots here, I just remembered that there is currently an initiative to add an IndexedRDD https://issues.apache.org/jira/browse/SPARK-2365 interface. Some interesting use cases mentioned there include (emphasis added): To address these problems, we propose IndexedRDD, an efficient key-value store built on RDDs. IndexedRDD would extend RDD[(Long, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and *point lookups, updates, and deletions*. GraphX would be the first user of IndexedRDD, since it currently implements a limited form of this functionality in VertexRDD. We envision a variety of other uses for IndexedRDD, including *streaming updates* to RDDs, *direct serving* from RDDs, and as an execution strategy for Spark SQL. Maybe some day we'll have Spark clusters directly serving up point lookups or updates. I imagine the tasks running on clusters like that would be tiny and would benefit from very low task startup times and scheduling latency. Am I painting that picture correctly? Yeah - we painted a similar picture in a short paper last year titled The Case for Tiny Tasks in Compute Clusters http://shivaram.org/publications/tinytasks-hotos13.pdf Anyway, thanks for explaining the current status of Sparrow. Nick
Re: [MLlib] Contributing Algorithm for Outlier Detection
We should take a vector instead giving the user flexibility to decide data source/ type What do you mean by vector datatype exactly? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Nov 5, 2014 at 6:45 AM, slcclimber anant.a...@gmail.com wrote: Ashutosh, I still see a few issues. 1. On line 112 you are counting using a counter. Since this will happen in a RDD the counter will cause issues. Also that is not good functional style to use a filter function with a side effect. You could use randomSplit instead. This does not the same thing without the side effect. 2. Similar shared usage of j in line 102 is going to be an issue as well. also hash seed does not need to be sequential it could be randomly generated or hashed on the values. 3. The compute function and trim scores still runs on a comma separeated RDD. We should take a vector instead giving the user flexibility to decide data source/ type. what if we want data from hive tables or parquet or JSON or avro formats. This is a very restrictive format. With vectors the user has the choice of taking in whatever data format and converting them to vectors insteda of reading json files creating a csv file and then workig on that. 4. Similar use of counters in 54 and 65 is an issue. Basically the shared state counters is a huge issue that does not scale. Since the processing of RDD's is distributed and the value j lives on the master. Anant On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List] ml-node+s1001551n9083...@n3.nabble.com wrote: Anant, I got rid of those increment/ decrements functions and now code is much cleaner. Please check. All your comments have been looked after. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala _Ashu https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master · codeAshu/Outlier-Detection-with-AVF-Spark · GitHub Contribute to Outlier-Detection-with-AVF-Spark development by creating an account on GitHub. Read more... https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala -- *From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden email] http://user/SendEmail.jtp?type=nodenode=9083i=0 *Sent:* Friday, October 31, 2014 10:09 AM *To:* Ashutosh Trivedi (MT2013030) *Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection You should create a jira ticket to go with it as well. Thanks On Oct 30, 2014 10:38 PM, Ashutosh [via Apache Spark Developers List] [hidden email] http://user/SendEmail.jtp?type=nodenode=9037i=0 wrote: Okay. I'll try it and post it soon with test case. After that I think we can go ahead with the PR. -- *From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden email] http://user/SendEmail.jtp?type=nodenode=9036i=0 *Sent:* Friday, October 31, 2014 10:03 AM *To:* Ashutosh Trivedi (MT2013030) *Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection Ashutosh, A vector would be a good idea vectors are used very frequently. Test data is usually stored in the spark/data/mllib folder On Oct 30, 2014 10:31 PM, Ashutosh [via Apache Spark Developers List] [hidden email] http://user/SendEmail.jtp?type=nodenode=9035i=0 wrote: Hi Anant, sorry for my late reply. Thank you for taking time and reviewing it. I have few comments on first issue. You are correct on the string (csv) part. But we can not take input of type you mentioned. We calculate frequency in our function. Otherwise user has to do all this computation. I realize that taking a RDD[Vector] would be general enough for all. What do you say? I agree on rest all the issues. I will correct them soon and post it. I have a doubt on test cases. Where should I put data while giving test scripts? or should i generate synthetic data for testing with in the scripts, how does this work? Regards, Ashutosh -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-developers-list.1001551.n3.nabble.com/MLlib-Contributing-Algorithm-for-Outlier-Detection-tp8880p9034.html To unsubscribe from [MLlib] Contributing Algorithm for Outlier Detection, click here. NAML
Should new YARN shuffle service work with yarn-alpha?
I noticed that this doesn't compile: mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package [error] warning: [options] bootstrap class path not set in conjunction with -source 1.6 [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:26: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.AuxiliaryService; [error] ^ [error] symbol: class AuxiliaryService [error] location: package org.apache.hadoop.yarn.server.api [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:27: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; [error] ^ ... Should it work? if not shall I propose to enable the service only with -Pyarn? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Should new YARN shuffle service work with yarn-alpha?
I bet it doesn't work. +1 on isolating it's inclusion to only the newer YARN API's. - Patrick On Fri, Nov 7, 2014 at 11:43 PM, Sean Owen so...@cloudera.com wrote: I noticed that this doesn't compile: mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package [error] warning: [options] bootstrap class path not set in conjunction with -source 1.6 [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:26: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.AuxiliaryService; [error] ^ [error] symbol: class AuxiliaryService [error] location: package org.apache.hadoop.yarn.server.api [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:27: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; [error] ^ ... Should it work? if not shall I propose to enable the service only with -Pyarn? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Should new YARN shuffle service work with yarn-alpha?
Hm. Problem is, core depends directly on it: [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/SecurityManager.scala:25: object sasl is not a member of package org.apache.spark.network [error] import org.apache.spark.network.sasl.SecretKeyHolder [error] ^ [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/SecurityManager.scala:147: not found: type SecretKeyHolder [error] private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { [error] ^ [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala:29: object RetryingBlockFetcher is not a member of package org.apache.spark.network.shuffle [error] import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher} [error]^ [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala:23: object sasl is not a member of package org.apache.spark.network [error] import org.apache.spark.network.sasl.SaslRpcHandler [error] ... [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala:124: too many arguments for constructor ExternalShuffleClient: (x$1: org.apache.spark.network.util.TransportConf, x$2: String)org.apache.spark.network.shuffle.ExternalShuffleClient [error] new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager, [error] ^ [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala:39: object protocol is not a member of package org.apache.spark.network.shuffle [error] import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo [error] ^ [error] /Users/srowen/Documents/spark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala:214: not found: type ExecutorShuffleInfo [error] val shuffleConfig = new ExecutorShuffleInfo( [error] ... More refactoring needed? Either to support YARN alpha as a separate shuffle module, or sever this dependency? Of course this goes away when yarn-alpha goes away too. On Sat, Nov 8, 2014 at 7:45 AM, Patrick Wendell pwend...@gmail.com wrote: I bet it doesn't work. +1 on isolating it's inclusion to only the newer YARN API's. - Patrick On Fri, Nov 7, 2014 at 11:43 PM, Sean Owen so...@cloudera.com wrote: I noticed that this doesn't compile: mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package [error] warning: [options] bootstrap class path not set in conjunction with -source 1.6 [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:26: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.AuxiliaryService; [error] ^ [error] symbol: class AuxiliaryService [error] location: package org.apache.hadoop.yarn.server.api [error] /Users/srowen/Documents/spark/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:27: error: cannot find symbol [error] import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; [error] ^ ... Should it work? if not shall I propose to enable the service only with -Pyarn? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org