[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13171047#comment-13171047 ] Claudio Martella commented on GIRAPH-45: I must explained wrongly the idea. The output format is basically: (key0, value2) (key0, value1) (key0, value4) ... (key1, value4) (key1, value2) (key1, value7) ... this is to say that the output file is sorted by key, but messages are not sorted, they end up in the file in the order they arrived. The index is a tree with a sample set of vertices (i.e. #totalVertices % N) with their position inside of that file. So if you want to look for the messages sent to a particular vertex, i.e. at the beginning of that vertex superstep, you'd look into the tree for the floorEntry(key) and scan that chunk until you see the beginning of the block where your messages are stored (and this for each file were we flushed). To minimize disk i/o, i'm using a bloomfilter so that i don't look into files were i reasonably know there aren't going to be messages for my vertex. At this point you can just iterate over the messages one after the other and feed them to the messages iterator fed to the compute() method. So the index doesn't have one entry per vertex, but only for a subset of the vertices (and no entry at all for the messages). An easier way would be to go as you suggest, to store all the messages to different files, flush them unsorted as they reach as certain threshold, and then load them up entirely at the beginning of a partition's superstep. This would allow us to avoid sort stuff but we'd need to be able to keep all the messages to all the vertices belonging to one partition in memory all the time. Maybe it's suitable, I was going for the approach that would require less in-memory contention. Another optimization I was going to look into is to change the format to (key0, #bytes, [message2, message1, message4]) (key1, #bytes, [message4, message2, message7]) this would allow us, when scanning the file looking for our block, to avoid deserializing all the messages as well, but just skip to the next key. It requires to serialize the message list to a byte[], so that i can write #bytes, before writing the messages to disk. I'll implement this after the naive version is ready. Does it make sense like this to you? Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13171073#comment-13171073 ] Avery Ching commented on GIRAPH-45: --- I think that a read messages-by-vertex at a time from disk will reduce memory pressure more than the partition-based storage. I'm assuming that key=vertex_id and value=message_list in your explanation. How do you keep the keys together in the file? For instance, suppose that you get the following tuples vertex_id, message_list 0, 2.0, 3.0 3, 1.0 7, 34.0 4, 23.0 3, 20.0 In a bad scenario, you have to spill to disk after each tuple. The files totally are out of order and your index vertex, bytes offset looks something like: 0, 0 3, 24 7, 40 4, 56 But if I'm understanding this scheme, wouldn't each vertex need to scan the entire file if the vertices keep coming and are totally random? I suppose that another way to do this is to use the partition-based method and add a small change. If the partition is deemed to large to load in memory and sort, it could be read and re-dumped into n files, where n is chosen such that there is a good chance that it produces small enough files so that every one of them can fit in memory at a time. This can be done recursively. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13171404#comment-13171404 ] Avery Ching commented on GIRAPH-45: --- Ah, thank you for clarifying that. The only minor downside is that a sorted map uses a bit more memory than a non-sorted one typically. But it's probably not too big a deal. Sounds like an idea certainly worth trying out Claudio =). Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13170563#comment-13170563 ] Claudio Martella commented on GIRAPH-45: Great, we're converge. The BTree is for indexing destination vertices. you want to have all the messages sent to the same vertex stored all together. It's just a TreeMap with a sample of the keyset in the flushed file, something similar to the HBase index. The code is complete, I'm running after a nasty bug and I'll work on integration then. The question is: do you know a good way to detect memory pressure? I've tried with Runtime.freeMemory() but it won't work. I was considering using a monitor softreference. any suggestion? Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13170613#comment-13170613 ] Avery Ching commented on GIRAPH-45: --- You might not need the BTree for indexing the destination vertices I think. Couldn't we use files to group the messages sent to the same partition? If you simply dump all the received vertex id, messages tuples to a file that is specific for a partition, we can simply load all the tuples for a single partition prior to computing on the worker and assign them to their destinations. I'm a little concerned that using an in-memory data structure to keep the message indices might be a little expensive (i.e. one BTree per vertex in your model if I'm understanding correctly). Regarding the streaming, I am not proposing to change the BSP model. I'm talking about sending the messages as we go along during the computation. Currently the messages are bulk sent at the end of the superstep. So rather than a bulk send, allow every worker to stream out some bunch of messages when under some pressure, rather than everything at the end. As far as detecting memory pressure, it looks like Runtime seems to do an okay job. If anyone knows anything better, that's cool too. You can look at MemoryUtils#getRuntimeMemoryStats() for a Runtime example. We'll need to define limits for memory pressure. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13169756#comment-13169756 ] Avery Ching commented on GIRAPH-45: --- I've been thinking about this a bit more. I don't think we actually need a database if we use disk friendly approach and take advantage of the knowledge of our system. Here is a rough proposal: There are two ways we can save memory here (out-of-core graph) and (out-of-core messages). In this way, we can use the memory as a cache rather than a totally in-memory database and messaging system. Here's how we can do the out-of-core graph: Workers already do the computation by partition. All partitions that are owned by the worker need to be processed and we want to minimize the amount of data loaded/stored to local disk (i.e. superstep.worker id.partition #.vertices). Local disk should be used here because it will be faster and no remote worker needs to directly access this data. Therefore the general algorithm would be for (partition : all in memory partitions) partition.computeAndGenerateOutgoingMessages() if (memoryPressure) partition.storeToFileSystem() for (partition : remaining in file system partitions) partition.loadFromFileSystem() partition.computeAndGenerateOutgoingMessages() if (memoryPressure) partition.storeToFileSystem() This should keep our partition cache as full as possible and have a minimal amount of loading/storing for partitions that can't fit in memory. Here's how we can do the out-of-core messaging: As the partitions are being processed by the workers, outgoing messages as kept in memory currently. They are flushed is a message list grows to a certain size. Otherwise, the messages are bulk sent at the end of the computation. What we can do is wait for a sendMessageReq and check for memory pressure. If memory pressure is an issue, then dump all the outgoing messages to HDFS files (i.e. superstep.worker id.partition #.outgoingMessages). Future sendMessageReq may be kept in memory or dumped to the same HDFS files if memory pressure is an issue. These HDFS files are closed prior to the flush. During the flush, the worker sends the in-memory messages as normal to the destinations as well as the filenames of the out-of-core messages to their respective owners. Note that the files are stored in HDFS to allow a remote worker the ability to load the messages as they see fit. Maybe reduce the replication factor to 2 by default for these files? This tactic should reduce memory usage on the destination worker as well, since the destination workers don't need to load the HDFS files until they are actually doing the computation for that partition. Checkpoints should be able to point to the out-of-core data as well to reduce the amount of data to store. Still, there is one more remaining piece (loading the graph). This can also run out of memory. Currently vertex lists are batched and sent to destination workers by partition. Partitions should have the ability to be incrementally dumped to local files on the destination if there is memory pressure. Then prior to the 1st superstep, each partition can be assembled (local files + any vertices stil in memory) and can use the out-of-core graph algorithm indicated above. This proposal should take advantage of large reads/writes so that we don't need a database. I will require out-of-core storage in the very near future as the graph i need to load will have billions of edges and I probably won't have enough nodes and memory to keep it all in core. Please let me know your thoughts on this approach. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158734#comment-13158734 ] Jakob Homan commented on GIRAPH-45: --- It's certainly an intriguing idea to go with something like leveldb. This is obviously an area for lots of exploration and experimentation. As such, probably the best idea is to make the interface pluggable and keep a good-enough Java version as default. It's probably time for a giraph-site.xml file to track these configuration possibilities. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158777#comment-13158777 ] Dmitriy V. Ryaboy commented on GIRAPH-45: - Do we need a full-blown db? Wouldn't a queue per destination work? Plenty of good jvm-based queue implementations out there. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158806#comment-13158806 ] Claudio Martella commented on GIRAPH-45: Dmitriy, could you provide an example? We're looking for disk-basked implementations that provide the usual append-only write performance and quick scan-based read. For this reason i thought about sequence-file based approach. I don't know any open source doing this without doing writes in-place (tokyo-kyoto kabinet). I'm writing something like this (memstore+sequencefile+compaction) pure java which is alpha-version now, I will speed up dev in the future, but of course it won't be bullet-proof [https://github.com/claudiomartella/sketches]. On this aspect there's a java open-source implementation of leveldb, but don't know if it's stable enough for us (we could take it (or sketches) under our umbrella thought) [https://github.com/dain/leveldb]. Probably going default with something fully java (derby or something you're going to suggest) and going pluggable is the best option. I'll work on this from next week (moving to amsterdam next week), finishing GIRAPH-10 right now. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158837#comment-13158837 ] Dmitriy V. Ryaboy commented on GIRAPH-45: - Sorry, you are right -- all the queues I am thinking of log to disk for persistence / recoverability, but are mostly meant for datasets that fit in ram. I did find this, though it doesn't look like it's being used in many places.. and I didn't read the code: http://code.google.com/p/ashes-queue/ Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158131#comment-13158131 ] Hyunsik Choi commented on GIRAPH-45: Claudio, Thank for a nice suggestion. That seems a cool idea. However, I concern with the platform dependency of leveldb. The leveldb is built in C++ language. It may give us a burden of the distribution of Giraph. What does anyone else think? Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13157588#comment-13157588 ] Claudio Martella commented on GIRAPH-45: was thinking, what about storing the out-of-core messages with leveldb? writing them as they come should be quite efficient as it writes data to an append-only commit log. it keeps data sorted, so reading with an iterator should be very fast. each worker could have for each superstep a leveldb database and each message could have a key composed like this: destination vertex id:uuid, keep in the payload the message. the db could be destroyed and created at each superstep by each worker. as far as combiners are concerned, they could be called (one the second endpoint) after each get() from the db at compute() time. We're trying to save memory space, not disk. They would be processed before each vertex's compute() and you'd be required to have all the messages sent to that vertex (in that superstep) in memory, because combiners' can't work incrementally. what do you guys think? Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13152360#comment-13152360 ] Jakob Homan commented on GIRAPH-45: --- On a side note, is it worth considering messages to be immutable (or provide a separate annotation for these)? This would help with message de-duplication, which could be a significant help in some algorithms. One would only need to keep one copy of the message going to a particular worker, regardless of the number of vertices it is bound for. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13152477#comment-13152477 ] Claudio Martella commented on GIRAPH-45: I was struggling with this and the annotation could actually be the elegant solution to this. Avery asked for a review one month ago about this. We could clone those that are not annotated as a general approach. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150732#comment-13150732 ] Jakob Homan commented on GIRAPH-45: --- Yeah, this would involve changing the save all the messages up and send them at the end of the superstep model to a more streaming model for outgoing messages, which I think may provide more throughput overall. If we send messages as they are generated (or in bunches intermittently), we'll decrease the time spent shuffling them at the end of step. This is important as the shuffle is a peer-to-peer process with every worker potentially sending something to every other worker and, in a chatty computation, may put pressure on the network. I'm skeptical how often combiners will be both possible and actually implemented. If a combiner is not defined, is there any benefit at all to not immediately sending out the non-local messages? One approach is when memory starts getting tight (or on a regular schedule), run the combiner and then ship out non-local-bound messages to their workers. Similarly, the receiving worker can run its combiner on the messages as they come in, spilling when necessary. It will probably be quite expensive to un-spill to disk to re-run the combiner, assuming the combiner managed to have an effect. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150753#comment-13150753 ] Claudio Martella commented on GIRAPH-45: I'm not sure whether sending messages in a streamy way would actually diminish any kind of memory pressure. As messages need the current superstep to be finished before they can be consumed, i guess this would just transfer the pressure to the other nodes where they are transfered to. In a certain scenario, this can actually mean putting more pressure on the cumulative memory consumed (the total memory of the nodes in the cluster). Suppose vertex A sends a message to vertex B, C, D and E. B and C are on the same node as A, D is on another second node and E is on a third node. This means that B and C share the message sent by A as they live in the same JVM (forget about a semantic where the message needs to be cloned before they are sent). In this scenario we would have #nodes copies of the same message overall the cluster. Topology-based graph partitioning would allow these messages to be sent mostly to vertices living in the same JVM (supposing the communication pattern of vertices follows graph topology) and would alleviate this problem. It feels like keeping messages out-of-core is the best option we have right now and if we manage to save the messages in the same order vertices they are sent to are processed, we could even get a scan-based computation that would grant quite a throughput. Does it make sense? Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150755#comment-13150755 ] Jakob Homan commented on GIRAPH-45: --- @Jake I'm suggesting that messages should only be spilled to disk on their target local disk. Spilling to disk would still be the main way to relieve memory pressure (along with combiners, of course). The question is should you be responsible to spilling messages that you won't be processing in the next superstep? If so, we're spending more time writing and reading the messages on the originator (and then, possibly, writing and reading again on the destination worker). bq. But now this is a much more complex computation model: to send out messages, you typically need to know what you've recieved in the previous step. I'm afraid I don't follow. This is still consistent with the BSP model. In a compute iteration, I can send a message to Z and it can either be held until everyone else is done computing, or immediately sent to the worker responsible for Z. Either way it gets stored in a container until the next superstep starts. No actual processing is done on the message itself. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150757#comment-13150757 ] Jakob Homan commented on GIRAPH-45: --- bq. I'm not sure whether sending messages in a streamy way would actually diminish any kind of memory pressure. Right. As above, the total amount of bytes that would have to be stored *somewhere* would be the same (absent any advantage we get from having more combine-able messages in one place to be combined). We'd be potentially saving (very slow) disk bandwidth by putting them on the network immediately where, depending on the destination's memory situation, it may or may not make sense to spill to disk, to combine, etc. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150767#comment-13150767 ] Claudio Martella commented on GIRAPH-45: Yes, exactly. Given that you'd have to hit the network anyway (so let's ignore that cost), the problem is whether the other end would store them on disk or not. If so, it can make sense for the sender to hit the disk and then send them to the other end at the beginning of next superstep, and that node would have to process them as they come. This would mean the disk is hit once on the node generating the messages, but then the other end would have to implement a pull-model. The other option, the push-model, would still require the second end-point to disk the disk twice. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150778#comment-13150778 ] Claudio Martella commented on GIRAPH-45: Sorry, I'll clarify my last sentence. The other option, the push-model, would still require the second end-point to hit the disk (as it could not be possible to keep them in memory, otherwise we wouldn't be discussing this at all) and that would be the second time in total. Maybe a third option would be a push-model where the end-point commands the amount of data and the timing of the push. (Something like: hey, I'm ready, give me the first 5 messages and then hey, i'm ready for the next 5 and so on). Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150784#comment-13150784 ] Jake Mannix commented on GIRAPH-45: --- @Jakob My comment about avoiding disk *and* having too-large message-sets is that in this combined case, your only option is to break BSP somehow, if you can't properly use Combiners. I think spilling to disk on the target side is a good idea, as it allows Combiners to be attempted both on sender-side, and reciever, and if you *still* have too much to hold in memory, you spill. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150847#comment-13150847 ] Jakob Homan commented on GIRAPH-45: --- yes, it does. I'm trying to learn from the mistakes of the past, not repeat them. :) Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150864#comment-13150864 ] Avery Ching commented on GIRAPH-45: --- It's going to be a bit complicated to store the messages on the destination worker. You will basically have to keep an out-of-core map. I still think storing partitions to disk will be easier than the messages. You only load up the ones you are computing. No need to worry about ordering either, very simple out-of-core structure. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (GIRAPH-45) Improve the way to keep outgoing messages
[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13151038#comment-13151038 ] Hyunsik Choi commented on GIRAPH-45: I'm in another time zone. I'm sad to miss the hot party. I consider this problem as Giraph becomes slow, but works well or Giraph cannot deal with some problems or data when the volume of generated messages exceeds the memory capacity. As you mentioned, apparently spilling data to disk is the simplest way to solve this problem. In addition, this way does not affect usual cases if spilling data is started only when the memory is getting tight. Anyway, the discussion is concluded as follows? - Each worker sends outgoing messages in an eager manner (immediately or periodically). - The receiving side spills incoming messages into disk only when the memory is getting tight. Avery, I also agree that storing partitions to disk is a good way to mitigate the memory problem. Also, I think that both ways are compatible and have different effects. The storing partitioning is more efficient if the volume of graph data is very large. Later, if Giraph enables users to choose the options (i.e., spilling, storing to partitions, or both), users can choose some of them according to their programs. Improve the way to keep outgoing messages - Key: GIRAPH-45 URL: https://issues.apache.org/jira/browse/GIRAPH-45 Project: Giraph Issue Type: Improvement Components: bsp Reporter: Hyunsik Choi Assignee: Hyunsik Choi As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem to cause out of memory when the rate of message generation is higher than the rate of message flush (or network bandwidth). To overcome this problem, we need more eager strategy for message flushing or some approach to spill messages into disk. The below link is Dmitriy's suggestion. https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira